pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jerrypeng closed pull request #2914: Refactor function metrics to use prometheus
Date Sun, 04 Nov 2018 05:14:38 GMT
jerrypeng closed pull request #2914: Refactor function metrics to use prometheus
URL: https://github.com/apache/pulsar/pull/2914
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index bd6281bfc2..23c3bb8591 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -65,7 +65,6 @@ assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
 # Frequency how often worker performs compaction on function-topics
 topicCompactionFrequencySec: 1800
-metricsSamplingPeriodSec: 60
 
 
 ###############################
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 9d4deb938d..3823cc0b4b 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -467,6 +467,8 @@ The Apache Software License, Version 2.0
     - io.dropwizard.metrics-metrics-core-3.1.0.jar
     - io.dropwizard.metrics-metrics-graphite-3.1.0.jar
     - io.dropwizard.metrics-metrics-jvm-3.1.0.jar
+  * Prometheus
+    - io.prometheus-simpleclient_httpserver-0.5.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f76df6b1d7..2fe61ed439 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -359,7 +359,6 @@ public void testPulsarFunctionStats() throws Exception {
         }, 5, 200);
 
         FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
-        functionRuntimeManager.updateRates();
         FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
                 functionName, null);
 
diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py
index 304d3450b8..952c57b684 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -70,6 +70,6 @@ def build_extension(self, ext):
     license="Apache License v2.0",
     url="http://pulsar.apache.org/",
     install_requires=[
-        'grpcio', 'protobuf'
+        'grpcio', 'protobuf', "prometheus_client"
     ],
 )
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 8dec8dcdb1..f734b036cd 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -107,6 +107,26 @@
       <artifactId>typetools</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient</artifactId>
+      <version>${prometheus.version}</version>
+    </dependency>
+
+    <!-- Hotspot JVM metrics-->
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_hotspot</artifactId>
+      <version>${prometheus.version}</version>
+    </dependency>
+
+    <!-- Exposition HTTPServer-->
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_httpserver</artifactId>
+      <version>${prometheus.version}</version>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index f2195de85c..15b01f7703 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -18,16 +18,15 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import com.google.common.collect.EvictingQueue;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Function stats.
  */
@@ -35,132 +34,94 @@
 @Getter
 @Setter
 public class FunctionStats {
+
+    private static final String[] metricsLabelNames = {"tenant", "namespace", "name", "instance_id"};
+
+    /** Declare Prometheus stats **/
+
+    final Counter statTotalProcessed;
+
+    final Counter statTotalProcessedSuccessfully;
+
+    final Counter statTotalSysExceptions;
+
+    final Counter statTotalUserExceptions;
+
+    final Summary statProcessLatency;
+
+    CollectorRegistry functionCollectorRegistry;
+
+    @Getter
+    private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10);
+    @Getter
+    private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);
+
     @Getter
     @Setter
-    class Stats {
-        private long totalProcessed;
-        private long totalSuccessfullyProcessed;
-        private long totalUserExceptions;
-        private List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions =
-                new LinkedList<>();
-        private long totalSystemExceptions;
-        private List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions =
-                new LinkedList<>();
-        private Map<String, Long> totalDeserializationExceptions = new HashMap<>();
-        private long totalSerializationExceptions;
-        private long totalLatencyMs;
-        private long lastInvocationTime;
-
-        public void incrementProcessed(long processedAt) {
-            totalProcessed++;
-            lastInvocationTime = processedAt;
-        }
-        public void incrementSuccessfullyProcessed(long latency) {
-            totalSuccessfullyProcessed++;
-            totalLatencyMs += latency;
-        }
-        public void incrementUserExceptions(Exception ex) {
-            InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                    InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                    .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
-            latestUserExceptions.add(info);
-            if (latestUserExceptions.size() > 10) {
-                latestUserExceptions.remove(0);
-            }
-            totalUserExceptions++;
-        }
-        public void incrementSystemExceptions(Exception ex) {
-            InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                    InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                            .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
-            latestSystemExceptions.add(info);
-            if (latestSystemExceptions.size() > 10) {
-                latestSystemExceptions.remove(0);
-            }
-            totalSystemExceptions++;
-        }
-        public void incrementDeserializationExceptions(String topic) {
-            if (!totalDeserializationExceptions.containsKey(topic)) {
-                totalDeserializationExceptions.put(topic, 0l);
-            }
-            totalDeserializationExceptions.put(topic, totalDeserializationExceptions.get(topic) + 1);
-        }
-        public void incrementSerializationExceptions() { totalSerializationExceptions++; }
-        public void reset() {
-            totalProcessed = 0;
-            totalSuccessfullyProcessed = 0;
-            totalUserExceptions = 0;
-            totalSystemExceptions = 0;
-            totalDeserializationExceptions.clear();
-            totalSerializationExceptions = 0;
-            totalLatencyMs = 0;
-        }
-        public double computeLatency() {
-            if (totalSuccessfullyProcessed <= 0) {
-                return 0;
-            } else {
-                return totalLatencyMs / (double) totalSuccessfullyProcessed;
-            }
-        }
-        
-        public void update(Stats stats) {
-            if (stats == null) {
-                return;
-            }
-            this.totalProcessed = stats.totalProcessed;
-            this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed;
-            this.totalUserExceptions = stats.totalUserExceptions;
-            this.latestUserExceptions.clear();
-            this.latestSystemExceptions.clear();
-            this.totalDeserializationExceptions.clear();
-            this.latestUserExceptions.addAll(stats.latestUserExceptions);
-            this.latestSystemExceptions.addAll(stats.latestSystemExceptions);
-            this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions);
-            this.totalSystemExceptions = stats.totalSystemExceptions;
-            this.latestSystemExceptions = stats.latestSystemExceptions;
-            this.totalSerializationExceptions = stats.totalSerializationExceptions;
-            this.totalLatencyMs = stats.totalLatencyMs;
-            this.lastInvocationTime = stats.lastInvocationTime;
-        }
-    }
+    private long lastInvocationTime = 0;
 
-    private Stats currentStats;
-    private Stats totalStats;
-    private Stats stats;
-    
     public FunctionStats() {
-        currentStats = new Stats();
-        stats = new Stats();
-        totalStats = new Stats();
-    }
+        // Declare function local collector registry so that it will not clash with other function instances'
+        // metrics collection especially in threaded mode
+        functionCollectorRegistry = new CollectorRegistry();
 
-    public void incrementProcessed(long processedAt) {
-        currentStats.incrementProcessed(processedAt);
-        totalStats.incrementProcessed(processedAt);
-    }
+        statTotalProcessed = Counter.build()
+                .name("__function_total_processed__")
+                .help("Total number of messages processed.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
 
-    public void incrementSuccessfullyProcessed(long latency) {
-        currentStats.incrementSuccessfullyProcessed(latency);
-        totalStats.incrementSuccessfullyProcessed(latency);
-    }
-    public void incrementUserExceptions(Exception ex) {
-        currentStats.incrementUserExceptions(ex);
-        totalStats.incrementUserExceptions(ex);
-    }
-    public void incrementSystemExceptions(Exception ex) {
-        currentStats.incrementSystemExceptions(ex);
-        totalStats.incrementSystemExceptions(ex);
+        statTotalProcessedSuccessfully = Counter.build()
+                .name("__function_total_successfully_processed__")
+                .help("Total number of messages processed successfully.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
+
+        statTotalSysExceptions = Counter.build()
+                .name("__function_total_system_exceptions__")
+                .help("Total number of system exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
+
+        statTotalUserExceptions = Counter.build()
+                .name("__function_total_user_exceptions__")
+                .help("Total number of user exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
+
+        statProcessLatency = Summary.build()
+                .name("__function_process_latency_ms__").help("Process latency in milliseconds.")
+                .quantile(0.5, 0.01)
+                .quantile(0.9, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(0.999, 0.01)
+                .labelNames(metricsLabelNames)
+                .register(functionCollectorRegistry);
     }
-    public void incrementDeserializationExceptions(String topic) {
-        currentStats.incrementDeserializationExceptions(topic);
-        totalStats.incrementDeserializationExceptions(topic);
+
+    public void addUserException(Exception ex) {
+        InstanceCommunication.FunctionStatus.ExceptionInformation info =
+                    InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                    .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+        latestUserExceptions.add(info);
     }
-    public void incrementSerializationExceptions() {
-        currentStats.incrementSerializationExceptions();
-        totalStats.incrementSerializationExceptions();
+
+    public void addSystemException(Throwable ex) {
+        InstanceCommunication.FunctionStatus.ExceptionInformation info =
+                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(System.currentTimeMillis()).build();
+        latestSystemExceptions.add(info);
+
     }
-    public void resetCurrent() {
-        stats.update(currentStats);
-        currentStats.reset();
+
+    public void reset() {
+        statTotalProcessed.clear();
+        statTotalProcessedSuccessfully.clear();
+        statTotalSysExceptions.clear();
+        statTotalUserExceptions.clear();
+        statProcessLatency.clear();
+        latestUserExceptions.clear();
+        latestSystemExceptions.clear();
+        lastInvocationTime = 0;
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index f626c1066d..6e8e79c0bf 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,6 +22,7 @@
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
+import io.prometheus.client.Summary;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -112,14 +113,7 @@
     private Sink sink;
 
     private final SecretsProvider secretsProvider;
-
-    public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
-    public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__";
-    public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__";
-    public static final String METRICS_TOTAL_USER_EXCEPTION = "__total_user_exceptions__";
-    public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION = "__total_deserialization_exceptions__";
-    public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION = "__total_serialization_exceptions__";
-    public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__";
+    private final String[] metricsLabels;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
@@ -134,6 +128,12 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
         this.secretsProvider = secretsProvider;
+        this.metricsLabels = new String[]{
+                instanceConfig.getFunctionDetails().getTenant(),
+                instanceConfig.getFunctionDetails().getNamespace(),
+                instanceConfig.getFunctionDetails().getName(),
+                String.valueOf(instanceConfig.getInstanceId())
+        };
     }
 
     /**
@@ -205,23 +205,31 @@ public void run() {
                     }
                 }
 
-                // process the message
-                long processAt = System.currentTimeMillis();
-                stats.incrementProcessed(processAt);
                 addLogTopicHandler();
                 JavaExecutionResult result;
 
+                // set last invocation time
+                stats.setLastInvocationTime(System.currentTimeMillis());
+
+                // start time for process latency stat
+                Summary.Timer requestTimer = stats.statProcessLatency.labels(metricsLabels).startTimer();
+
+                // process the message
                 result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
 
+                // register end time
+                requestTimer.observeDuration();
+                // increment total processed
+                stats.statTotalProcessed.labels(metricsLabels).inc();
+
                 removeLogTopicHandler();
 
-                long doneProcessing = System.currentTimeMillis();
                 if (log.isDebugEnabled()) {
                     log.debug("Got result: {}", result.getResult());
                 }
 
                 try {
-                    processResult(currentRecord, result, processAt, doneProcessing);
+                    processResult(currentRecord, result);
                 } catch (Exception e) {
                     log.warn("Failed to process result of message {}", currentRecord, e);
                     currentRecord.fail();
@@ -230,6 +238,8 @@ public void run() {
         } catch (Throwable t) {
             log.error("[{}] Uncaught exception in Java Instance", functionName, t);
             deathException = t;
+            stats.statTotalSysExceptions.labels(metricsLabels).inc();
+            stats.addSystemException(t);
             return;
         } finally {
             log.info("Closing instance");
@@ -307,18 +317,14 @@ private void setupStateTable() throws Exception {
     }
 
     private void processResult(Record srcRecord,
-                               JavaExecutionResult result,
-                               long startTime, long endTime) throws Exception {
+                               JavaExecutionResult result) throws Exception {
         if (result.getUserException() != null) {
             log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException());
-            stats.incrementUserExceptions(result.getUserException());
+            stats.statTotalUserExceptions.labels(metricsLabels).inc();
+            stats.addUserException(result.getUserException() );
             srcRecord.fail();
-        } else if (result.getSystemException() != null) {
-            log.info("Encountered system exception when processing message {}", srcRecord, result.getSystemException());
-            stats.incrementSystemExceptions(result.getSystemException());
-            throw result.getSystemException();
         } else {
-            stats.incrementSuccessfullyProcessed(endTime - startTime);
+            stats.statTotalProcessedSuccessfully.labels(metricsLabels).inc();
             if (result.getResult() != null) {
                 sendOutputMessage(srcRecord, result.getResult());
             } else {
@@ -398,7 +404,7 @@ public void close() {
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {
         InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
-        stats.resetCurrent();
+        stats.reset();
         if (javaInstance != null) {
             InstanceCommunication.MetricsData userMetrics =  javaInstance.getAndResetMetrics();
             if (userMetrics != null) {
@@ -420,42 +426,39 @@ public void close() {
     }
 
     public void resetMetrics() {
-        stats.resetCurrent();
+        stats.reset();
         javaInstance.resetMetrics();
     }
 
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics(METRICS_TOTAL_PROCESSED, stats.getStats().getTotalProcessed(), bldr);
-        addSystemMetrics(METRICS_TOTAL_SUCCESS, stats.getStats().getTotalSuccessfullyProcessed(),
+        addSystemMetrics("__total_processed__", stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__total_successfully_processed__", stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__total_system_exceptions__",  stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__total_user_exceptions__", stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
+        addSystemMetrics("__avg_latency_ms__",
+                stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0
+                        ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count,
                 bldr);
-        addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION, stats.getStats().getTotalSystemExceptions(), bldr);
-        addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION, stats.getStats().getTotalUserExceptions(), bldr);
-        stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) -> {
-            addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic, count, bldr);
-        });
-        addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION,
-                stats.getStats().getTotalSerializationExceptions(), bldr);
-        addSystemMetrics(METRICS_AVG_LATENCY, stats.getStats().computeLatency(), bldr);
         return bldr;
     }
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
-        functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed());
-        functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalStats().getTotalSuccessfullyProcessed());
-        functionStatusBuilder.setNumUserExceptions(stats.getTotalStats().getTotalUserExceptions());
-        stats.getTotalStats().getLatestUserExceptions().forEach(ex -> {
+        functionStatusBuilder.setNumProcessed((long)stats.statTotalProcessed.labels(metricsLabels).get());
+        functionStatusBuilder.setNumSuccessfullyProcessed((long)stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+        functionStatusBuilder.setNumUserExceptions((long)stats.statTotalUserExceptions.labels(metricsLabels).get());
+        stats.getLatestUserExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestUserExceptions(ex);
         });
-        functionStatusBuilder.setNumSystemExceptions(stats.getTotalStats().getTotalSystemExceptions());
-        stats.getTotalStats().getLatestSystemExceptions().forEach(ex -> {
+        functionStatusBuilder.setNumSystemExceptions((long) stats.statTotalSysExceptions.labels(metricsLabels).get());
+        stats.getLatestSystemExceptions().forEach(ex -> {
             functionStatusBuilder.addLatestSystemExceptions(ex);
         });
-        functionStatusBuilder.putAllDeserializationExceptions(stats.getTotalStats().getTotalDeserializationExceptions());
-        functionStatusBuilder.setSerializationExceptions(stats.getTotalStats().getTotalSerializationExceptions());
-        functionStatusBuilder.setAverageLatency(stats.getTotalStats().computeLatency());
-        functionStatusBuilder.setLastInvocationTime(stats.getTotalStats().getLastInvocationTime());
+        functionStatusBuilder.setAverageLatency(
+                stats.statProcessLatency.labels(metricsLabels).get().count == 0.0
+                        ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count);
+        functionStatusBuilder.setLastInvocationTime(stats.getLastInvocationTime());
         return functionStatusBuilder;
     }
 
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 24383384f2..37a0cb2353 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -34,6 +34,7 @@
 from functools import partial
 from collections import namedtuple
 from threading import Timer
+from prometheus_client import Counter, Summary
 import traceback
 import sys
 import re
@@ -69,67 +70,50 @@ def base64ify(bytes_or_str):
 
 # We keep track of the following metrics
 class Stats(object):
-  def __init__(self):
-    self.reset()
+  metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id']
+
+  TOTAL_PROCESSED = '__function_total_processed__'
+  TOTAL_SUCCESSFULLY_PROCESSED = '__function_total_successfully_processed__'
+  TOTAL_SYSTEM_EXCEPTIONS = '__function_total_system_exceptions__'
+  TOTAL_USER_EXCEPTIONS = '__function_total_user_exceptions__'
+  PROCESS_LATENCY_MS = '__function_process_latency_ms__'
+
+  # Declare Prometheus
+  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
+  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+                                              'Total number of messages processed successfully.', metrics_label_names)
+  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
+                                      metrics_label_names)
+  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.',
+                                       metrics_label_names)
+
+  stats_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names)
+
+  latest_user_exception = []
+  latest_sys_exception = []
+
+  last_invocation_time = 0.0
+
+  def add_user_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
+
+  def add_sys_exception(self):
+    self.latest_sys_exception.append((traceback.format_exc(), int(time.time() * 1000)))
+    if len(self.latest_sys_exception) > 10:
+      self.latest_sys_exception.pop(0)
 
   def reset(self):
-    self.nprocessed = 0
-    self.nsuccessfullyprocessed = 0
-    self.nuserexceptions = 0
-    self.latestuserexceptions = []
-    self.nsystemexceptions = 0
-    self.latestsystemexceptions = []
-    self.ndeserialization_exceptions = {}
-    self.nserialization_exceptions = 0
-    self.latency = 0
-    self.lastinvocationtime = 0
-
-  def increment_deser_errors(self, topic):
-    if topic not in self.ndeserialization_exceptions:
-      self.ndeserialization_exceptions[topic] = 0
-    self.ndeserialization_exceptions[topic] += 1
-
-  def increment_successfully_processed(self, latency):
-    self.nsuccessfullyprocessed += 1
-    self.latency += latency
-
-  def increment_processed(self, processed_at):
-    self.nprocessed += 1
-    self.lastinvocationtime = processed_at
-
-  def record_user_exception(self, ex):
-    self.latestuserexceptions.append((traceback.format_exc(), int(time.time() * 1000)))
-    if len(self.latestuserexceptions) > 10:
-      self.latestuserexceptions.pop(0)
-    self.nuserexceptions = self.nuserexceptions + 1
-
-  def record_system_exception(self, ex):
-    self.latestsystemexceptions.append((traceback.format_exc(), int(time.time() * 1000)))
-    if len(self.latestsystemexceptions) > 10:
-      self.latestsystemexceptions.pop(0)
-    self.nsystemexceptions = self.nsystemexceptions + 1
-
-  def compute_latency(self):
-    if self.nsuccessfullyprocessed <= 0:
-      return 0
-    else:
-      return self.latency / self.nsuccessfullyprocessed
-
-  def update(self, object):
-    self.nprocessed = object.nprocessed
-    self.nsuccessfullyprocessed = object.nsuccessfullyprocessed
-    self.nuserexceptions = object.nuserexceptions
-    self.nsystemexceptions = object.nsystemexceptions
-    self.nserialization_exceptions = object.nserialization_exceptions
-    self.latency = object.latency
-    self.lastinvocationtime = object.lastinvocationtime
-    self.latestuserexceptions = []
-    self.latestsystemexceptions = []
-    self.ndeserialization_exceptions.clear()
-    self.latestuserexceptions.append(object.latestuserexceptions)
-    self.latestsystemexceptions.append(object.latestsystemexceptions)
-    self.ndeserialization_exceptions.update(object.ndeserialization_exceptions)
-    
+    self.latest_user_exception.clear()
+    self.latest_sys_exception.clear()
+    self.stat_total_processed._value.set(0.0)
+    self.stat_total_processed_successfully._value.set(0.0)
+    self.stat_total_user_exceptions._value.set(0.0)
+    self.stat_total_sys_exceptions._value.set(0.0)
+    self.stats_process_latency_ms._sum.set(0)
+    self.stats_process_latency_ms._count.set(0);
+    self.last_invocation_time = 0.0
 
 class PythonInstance(object):
   def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client, secrets_provider):
@@ -146,18 +130,17 @@ def __init__(self, instance_id, function_id, function_version, function_details,
     self.function_class = None
     self.function_purefunction = None
     self.producer = None
-    self.exeuction_thread = None
+    self.execution_thread = None
     self.atmost_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATMOST_ONCE')
     self.atleast_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE')
     self.auto_ack = self.instance_config.function_details.autoAck
     self.contextimpl = None
-    self.total_stats = Stats()
-    self.current_stats = Stats()
     self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
     self.secrets_provider = secrets_provider
+    self.metrics_labels = [function_details.tenant, function_details.namespace, function_details.name, instance_id]
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -229,8 +212,8 @@ def run(self):
 
     self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
     # Now launch a thread that does execution
-    self.exeuction_thread = threading.Thread(target=self.actual_execution)
-    self.exeuction_thread.start()
+    self.execution_thread = threading.Thread(target=self.actual_execution)
+    self.execution_thread.start()
 
     # start proccess spawner health check timer
     self.last_health_check_ts = time.time()
@@ -239,49 +222,49 @@ def run(self):
 
   def actual_execution(self):
     Log.debug("Started Thread for executing the function")
+
     while True:
-      msg = self.queue.get(True)
-      if isinstance(msg, InternalQuitMessage):
-        break
-      user_exception = False
-      system_exception = False
-      Log.debug("Got a message from topic %s" % msg.topic)
-      input_object = None
       try:
+        msg = self.queue.get(True)
+        if isinstance(msg, InternalQuitMessage):
+          break
+        Log.debug("Got a message from topic %s" % msg.topic)
+        # deserialize message
         input_object = msg.serde.deserialize(msg.message.data())
-      except:
-        self.current_stats.increment_deser_errors(msg.topic)
-        self.total_stats.increment_deser_errors(msg.topic)
-        continue
-      self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic)
-      output_object = None
-      self.saved_log_handler = None
-      if self.log_topic_handler is not None:
-        self.saved_log_handler = log.remove_all_handlers()
-        log.add_handler(self.log_topic_handler)
-      start_time = time.time()
-      self.current_stats.increment_processed(int(start_time) * 1000)
-      self.total_stats.increment_processed(int(start_time) * 1000)
-      successfully_executed = False
-      try:
-        if self.function_class is not None:
-          output_object = self.function_class.process(input_object, self.contextimpl)
-        else:
-          output_object = self.function_purefunction.process(input_object)
-        successfully_executed = True
+        # set current message in context
+        self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic)
+        output_object = None
+        self.saved_log_handler = None
+        if self.log_topic_handler is not None:
+          self.saved_log_handler = log.remove_all_handlers()
+          log.add_handler(self.log_topic_handler)
+        successfully_executed = False
+        try:
+          # get user function start time for statistic calculation
+          start_time = time.time()
+          self.stats.last_invocation_time = start_time * 1000.0
+          if self.function_class is not None:
+            output_object = self.function_class.process(input_object, self.contextimpl)
+          else:
+            output_object = self.function_purefunction.process(input_object)
+          successfully_executed = True
+          Stats.stats_process_latency_ms.labels(*self.metrics_labels).observe((time.time() - start_time) * 1000.0)
+          Stats.stat_total_processed.labels(*self.metrics_labels).inc()
+        except Exception as e:
+          Log.exception("Exception while executing user method")
+          Stats.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
+          self.stats.add_user_exception()
+
+        if self.log_topic_handler is not None:
+          log.remove_all_handlers()
+          log.add_handler(self.saved_log_handler)
+        if successfully_executed:
+          self.process_result(output_object, msg)
+
       except Exception as e:
-        Log.exception("Exception while executing user method")
-        self.total_stats.record_user_exception(e)
-        self.current_stats.record_user_exception(e)
-      end_time = time.time()
-      latency = (end_time - start_time) * 1000
-      self.total_stats.increment_successfully_processed(latency)
-      self.current_stats.increment_successfully_processed(latency)
-      if self.log_topic_handler is not None:
-        log.remove_all_handlers()
-        log.add_handler(self.saved_log_handler)
-      if successfully_executed:
-        self.process_result(output_object, msg)
+        Log.error("Uncaught exception in Python instance: %s" % e);
+        Stats.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
+        self.stats.add_sys_exception()
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
@@ -289,23 +272,17 @@ def done_producing(self, consumer, orig_message, result, sent_message):
 
   def process_result(self, output, msg):
     if output is not None:
-      output_bytes = None
       if self.output_serde is None:
         self.setup_output_serde()
       if self.producer is None:
         self.setup_producer()
-      try:
-        output_bytes = self.output_serde.serialize(output)
-      except:
-        self.current_stats.nserialization_exceptions += 1
-        self.total_stats.nserialization_exceptions += 1
+
+      # serialize function output
+      output_bytes = self.output_serde.serialize(output)
+
       if output_bytes is not None:
         props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : base64ify(msg.message.message_id().serialize())}
-        try:
-          self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props)
-        except Exception as e:
-          self.current_stats.record_system_exception(e)
-          self.total_stats.record_system_exception(e)
+        self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props)
     elif self.auto_ack and self.atleast_once:
       msg.consumer.acknowledge(msg.message)
 
@@ -343,25 +320,23 @@ def get_and_reset_metrics(self):
     return metrics
 
   def reset_metrics(self):
-    self.stats.update(self.current_stats)
-    self.current_stats.reset()
+    self.stats.reset()
     self.contextimpl.reset_metrics()
 
   def get_metrics(self):
     # First get any user metrics
     metrics = self.contextimpl.get_metrics()
     # Now add system metrics as well
-    self.add_system_metrics("__total_processed__", self.stats.nprocessed, metrics)
-    self.add_system_metrics("__total_successfully_processed__", self.stats.nsuccessfullyprocessed, metrics)
-    self.add_system_metrics("__total_system_exceptions__", self.stats.nsystemexceptions, metrics)
-    self.add_system_metrics("__total_user_exceptions__", self.stats.nuserexceptions, metrics)
-    for (topic, metric) in self.stats.ndeserialization_exceptions.items():
-      self.add_system_metrics("__total_deserialization_exceptions__" + topic, metric, metrics)
-    self.add_system_metrics("__total_serialization_exceptions__", self.stats.nserialization_exceptions, metrics)
-    self.add_system_metrics("__avg_latency_ms__", self.stats.compute_latency(), metrics)
+    self.add_system_metrics("__total_processed__", Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics("__total_successfully_processed__", Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(), metrics)
+    self.add_system_metrics("__total_system_exceptions__", Stats.stat_total_sys_exceptions._value.labels(*self.metrics_labels).get(), metrics)
+    self.add_system_metrics("__total_user_exceptions__", Stats.stat_total_user_exceptions._value.labels(*self.metrics_labels).get(), metrics)
+    self.add_system_metrics("__avg_latency_ms__",
+                            0.0 if Stats.stats_process_latency_ms._count.labels(*self.metrics_labels).get() <= 0.0
+                            else Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stats_process_latency_ms._count.labels(*self.metrics_labels).get(),
+                            metrics)
     return metrics
 
-
   def add_system_metrics(self, metric_name, value, metrics):
     metrics.metrics[metric_name].count = value
     metrics.metrics[metric_name].sum = value
@@ -371,26 +346,25 @@ def add_system_metrics(self, metric_name, value, metrics):
   def get_function_status(self):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
-    status.numProcessed = self.total_stats.nprocessed
-    status.numSuccessfullyProcessed = self.total_stats.nsuccessfullyprocessed
-    status.numUserExceptions = self.total_stats.nuserexceptions
+    status.numProcessed = long(Stats.stat_total_processed.labels(*self.metrics_labels)._value.get())
+    status.numSuccessfullyProcessed = long(Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get())
+    status.numUserExceptions = long(Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get())
     status.instanceId = self.instance_config.instance_id
-    for ex, tm in self.total_stats.latestuserexceptions:
+    for ex, tm in self.stats.latest_user_exception:
       to_add = status.latestUserExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    status.numSystemExceptions = self.total_stats.nsystemexceptions
-    for ex, tm in self.total_stats.latestsystemexceptions:
+    status.numSystemExceptions = long(Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get())
+    for ex, tm in self.stats.latest_sys_exception:
       to_add = status.latestSystemExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    for (topic, metric) in self.total_stats.ndeserialization_exceptions.items():
-      status.deserializationExceptions[topic] = metric
-    status.serializationExceptions = self.total_stats.nserialization_exceptions
-    status.averageLatency = self.total_stats.compute_latency()
-    status.lastInvocationTime = self.total_stats.lastinvocationtime
+    status.averageLatency = 0.0 \
+      if Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0 \
+      else Stats.stats_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stats_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    status.lastInvocationTime = long(self.stats.last_invocation_time)
     return status
 
   def join(self):
     self.queue.put(InternalQuitMessage(True), True)
-    self.exeuction_thread.join()
+    self.execution_thread.join()
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 80503bb82b..c26cb6ed46 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,6 +29,8 @@
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -186,6 +188,10 @@ public void run() {
                 }
             }
         });
+
+        // registering jvm metrics to prometheus
+        DefaultExports.initialize();
+
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 67a7aada0b..b11a65da2a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -713,26 +713,7 @@ private void startFunctionInstance(Assignment assignment) {
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
         return this.functionRuntimeInfoMap;
     }
-    
-    public void updateRates() {
-        if (runtimeFactory.externallyManaged()) {
-            // We don't do metrics management for externally managed functions
-            return;
-        }
-        for (Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet()) {
-            RuntimeSpawner functionRuntimeSpawner = entry.getValue().getRuntimeSpawner();
-            if (functionRuntimeSpawner != null) {
-                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
-                if (functionRuntime != null) {
-                    try {
-                        functionRuntime.resetMetrics().get();
-                    } catch (Exception e) {
-                        log.error("Failed to update stats for {}-{}", entry.getKey(), e.getMessage());
-                    }
-                }
-            }
-        }
-    }
+
     /**
      * Private methods for internal use.  Should not be used outside of this class
      */
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 80c1b775fc..e497ff3352 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -58,9 +58,7 @@ public static void generate(WorkerService workerService, String cluster, SimpleT
                     Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
                     if (functionRuntime != null) {
                         try {
-                            InstanceCommunication.MetricsData metrics = workerService.getWorkerConfig()
-                                    .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get()
-                                            : functionRuntime.getAndResetMetrics().get();
+                            InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get();
                             for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry
                                     : metrics.getMetricsMap().entrySet()) {
                                 String metricName = metricsEntry.getKey();
@@ -75,13 +73,13 @@ public static void generate(WorkerService workerService, String cluster, SimpleT
                                 int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
                                 String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
 
-                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%scount", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, String.format("%scount", metricName),
                                         instanceId, dataDigest.getCount());
-                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%smax", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, String.format("%smax", metricName),
                                         instanceId, dataDigest.getMax());
-                                metric(out, cluster, qualifiedNamespace,name, String.format("pulsar_function%smin", metricName),
+                                metric(out, cluster, qualifiedNamespace,name, String.format("%smin", metricName),
                                         instanceId, dataDigest.getMin());
-                                metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%ssum", metricName),
+                                metric(out, cluster, qualifiedNamespace, name, String.format("%ssum", metricName),
                                         instanceId, dataDigest.getSum());
 
                             }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 5a95a005ec..4adf0d4c21 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -77,7 +77,6 @@
     private String clientAuthenticationParameters;
     // Frequency how often worker performs compaction on function-topics
     private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
-    private int metricsSamplingPeriodSec = 60;
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 2b9c632479..9d25b646ae 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -184,12 +184,6 @@ public void start(URI dlogUri) throws InterruptedException {
 
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
-            int metricsSamplingPeriodSec = this.workerConfig.getMetricsSamplingPeriodSec();
-            if (metricsSamplingPeriodSec > 0) {
-                this.statsUpdater.scheduleAtFixedRate(() -> this.functionRuntimeManager.updateRates(),
-                        metricsSamplingPeriodSec, metricsSamplingPeriodSec, TimeUnit.SECONDS);
-            }
-
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index e5b67b7402..88c0a17f0a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -168,9 +168,7 @@ private Response getFunctionsMetrics() throws IOException {
                 Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
                 if (functionRuntime != null) {
                     try {
-                        InstanceCommunication.MetricsData metricsData = workerService.getWorkerConfig()
-                                .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get()
-                                : functionRuntime.getAndResetMetrics().get();
+                        InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
 
                         String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
                                 .getFunctionDetails().getTenant();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 68a13b4264..e35aa2b90b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -87,10 +87,10 @@ public void testFunctionsStatsGenerate() {
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
                 .putMetrics(
-                        "__total_processed__",
+                        "__function_total_processed__",
                         InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
-                .putMetrics("__avg_latency_ms__",
+                .putMetrics("__function_process_latency_ms__",
                         InstanceCommunication.MetricsData.DataDigest.newBuilder()
                                 .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
                 .build();
@@ -126,56 +126,57 @@ public void testFunctionsStatsGenerate() {
 
         Assert.assertEquals(metrics.size(), 8);
 
-        Metric m = metrics.get("pulsar_function__total_processed__count");
+        System.out.println("metrics: " + metrics);
+        Metric m = metrics.get("__function_total_processed__count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 100.0);
 
-        m = metrics.get("pulsar_function__total_processed__max");
+        m = metrics.get("__function_total_processed__max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 200.0);
 
-        m = metrics.get("pulsar_function__total_processed__sum");
+        m = metrics.get("__function_total_processed__sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 300.0);
 
-        m = metrics.get("pulsar_function__total_processed__min");
+        m = metrics.get("__function_total_processed__min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 0.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__count");
+        m = metrics.get("__function_process_latency_ms__count");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 10.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__max");
+        m = metrics.get("__function_process_latency_ms__max");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 20.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__sum");
+        m = metrics.get("__function_process_latency_ms__sum");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 30.0);
 
-        m = metrics.get("pulsar_function__avg_latency_ms__min");
+        m = metrics.get("__function_process_latency_ms__min");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message