pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jerryp...@apache.org
Subject [pulsar] branch master updated: Correcting metrics and adding tests (#3050)
Date Mon, 26 Nov 2018 14:45:14 GMT
This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b721ae3   Correcting metrics and adding tests (#3050)
b721ae3 is described below

commit b721ae3bcacc0d93c259d6f59465a0c0b0d402e0
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Mon Nov 26 09:45:10 2018 -0500

     Correcting metrics and adding tests (#3050)
    
    * Correcting metrics and adding tests
    
    * remove commmented out code
    
    * remove space
    
    * fix integration test
    
    * improving impl
    
    * fix ObjectMapper
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  3 +
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 66 +++++++++++++++-
 .../pulsar/common/policies/data/FunctionStats.java | 64 +++++++++++-----
 .../pulsar/functions/runtime/JavaInstanceMain.java | 10 ---
 .../org/apache/pulsar/functions/worker/Utils.java  |  6 +-
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  3 +
 .../integration/functions/PulsarFunctionsTest.java | 88 ++++++++++++++++++++++
 7 files changed, 207 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 84cf8e0..8a13a8a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -30,6 +30,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -189,6 +190,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
                                      final @PathParam("namespace") String namespace,
@@ -205,6 +207,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final
@PathParam("tenant") String tenant,
                                              final @PathParam("namespace") String namespace,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 6d89634..f9491f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -353,6 +353,55 @@ public class PulsarFunctionE2ETest {
         // validate pulsar sink consumer has started on the topic
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
+        // validate stats are empty
+        FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
+        FunctionStats functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+                functionName, null);
+        FunctionStats functionStatsFromAdmin = admin.functions().getFunctionStats(tenant,
namespacePortion,
+                functionName);
+
+        assertEquals(functionStats, functionStatsFromAdmin);
+
+        assertEquals(functionStats.getReceivedTotal(), 0);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.avgProcessLatency, null);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+        assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.getLastInvocation(), null);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, null);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
null);
+
+        assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+
+        // validate function instance stats empty
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStats
= functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
+                functionName, 0,  null);
+
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsAdmin
= admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName, 0);
+
+        assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+        assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());
+
+
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
@@ -367,11 +416,12 @@ public class PulsarFunctionE2ETest {
             }
         }, 5, 200);
 
-        FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
-        FunctionStats functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
+
+        // get stats after producing
+        functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
                 functionName, null);
         
-        FunctionStats functionStatsFromAdmin = admin.functions().getFunctionStats(tenant,
namespacePortion,
+        functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
                 functionName);
 
         assertEquals(functionStats, functionStatsFromAdmin);
@@ -404,6 +454,16 @@ public class PulsarFunctionE2ETest {
 
         assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
         assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(),
functionStats.getAvgProcessLatency());
+
+        // validate function instance stats
+        functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
+                functionName, 0,  null);
+
+        functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
+                functionName, 0);
+
+        assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
+        assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());
     }
 
     @Test(timeOut = 20000)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index ba274c1..ff73272 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -18,17 +18,20 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
 
 @Data
+@JsonInclude(JsonInclude.Include.ALWAYS)
 @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
"userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "instances" })
 public class FunctionStats {
 
@@ -55,7 +58,7 @@ public class FunctionStats {
     /**
      * Average process latency for function
      **/
-    public double avgProcessLatency;
+    public Double avgProcessLatency;
 
     @JsonProperty("1min")
     public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStats.FunctionInstanceStatsDataBase();
@@ -63,9 +66,10 @@ public class FunctionStats {
     /**
      * Timestamp of when the function was last invoked by any instance
      **/
-    public long lastInvocation;
+    public Long lastInvocation;
 
     @Data
+    @JsonInclude(JsonInclude.Include.ALWAYS)
     @JsonPropertyOrder({ "instanceId", "metrics" })
     public static class FunctionInstanceStats {
 
@@ -73,6 +77,7 @@ public class FunctionStats {
         public int instanceId;
 
         @Data
+        @JsonInclude(JsonInclude.Include.ALWAYS)
         @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
"userExceptionsTotal", "avgProcessLatency" })
         public static class FunctionInstanceStatsDataBase {
             /**
@@ -98,10 +103,11 @@ public class FunctionStats {
             /**
              * Average process latency for function for instance
              **/
-            public double avgProcessLatency;
+            public Double avgProcessLatency;
         }
 
         @Data
+        @JsonInclude(JsonInclude.Include.ALWAYS)
         @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
"userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" })
         public static class FunctionInstanceStatsData extends FunctionInstanceStatsDataBase
{
 
@@ -111,7 +117,7 @@ public class FunctionStats {
             /**
              * Timestamp of when the function was last invoked for instance
              **/
-            public long lastInvocation;
+            public Long lastInvocation;
 
             /**
              * Map of user defined metrics
@@ -130,35 +136,59 @@ public class FunctionStats {
 
     public FunctionStats calculateOverall() {
 
-        lastInvocation = 0;
-        instances.forEach(new Consumer<FunctionInstanceStats>() {
-            @Override
-            public void accept(FunctionInstanceStats functionInstanceStats) {
+        int nonNullInstances = 0;
+        int nonNullInstancesOneMin = 0;
+        for (FunctionInstanceStats functionInstanceStats : instances) {
                 FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
= functionInstanceStats.getMetrics();
                 receivedTotal += functionInstanceStatsData.receivedTotal;
                 processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal;
                 systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
                 userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
-                avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
+                if (functionInstanceStatsData.avgProcessLatency != null) {
+                    if (avgProcessLatency == null) {
+                        avgProcessLatency = 0.0;
+                    }
+                    avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
+                    nonNullInstances ++;
+                }
 
                 oneMin.receivedTotal += functionInstanceStatsData.oneMin.receivedTotal;
                 oneMin.processedSuccessfullyTotal += functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
                 oneMin.systemExceptionsTotal += functionInstanceStatsData.oneMin.systemExceptionsTotal;
                 oneMin.userExceptionsTotal += functionInstanceStatsData.oneMin.userExceptionsTotal;
-                oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;
-
-                if (functionInstanceStatsData.lastInvocation > lastInvocation) {
-                    lastInvocation = functionInstanceStatsData.lastInvocation;
+                if (functionInstanceStatsData.oneMin.avgProcessLatency != null) {
+                    if (oneMin.avgProcessLatency == null) {
+                        oneMin.avgProcessLatency = 0.0;
+                    }
+                    oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;
+                    nonNullInstancesOneMin ++;
                 }
 
+                if (functionInstanceStatsData.lastInvocation != null) {
+                    if (lastInvocation == null || functionInstanceStatsData.lastInvocation
> lastInvocation) {
+                        lastInvocation = functionInstanceStatsData.lastInvocation;
+                    }
+                }
             }
-        });
+
         // calculate average from sum
-        avgProcessLatency = avgProcessLatency / instances.size();
+        if (nonNullInstances > 0) {
+            avgProcessLatency = avgProcessLatency / nonNullInstances;
+        } else {
+            avgProcessLatency = null;
+        }
 
         // calculate 1min average from sum
-        oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
+        if (nonNullInstancesOneMin > 0) {
+            oneMin.avgProcessLatency = oneMin.avgProcessLatency / nonNullInstancesOneMin;
+        } else {
+            oneMin.avgProcessLatency = null;
+        }
 
         return this;
     }
+
+    public static FunctionStats decode (String json) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().readValue(json, FunctionStats.class);
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index d5fb80e..f0be511 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -29,17 +29,8 @@ import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.DefaultExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -59,7 +50,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 /**
  * A function container implemented using java thread.
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index d6863a7..b3663c9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -219,14 +219,14 @@ public final class Utils {
                     functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
                     functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
                     functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
-                    functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
-                    functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
+                    functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()
== 0.0 ? null : metricsData.getAvgProcessLatency());
+                    functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation()
== 0 ? null : metricsData.getLastInvocation());
 
                     functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
                     functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
                     functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
                     functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
-                    functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
+                    functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min()
== 0.0 ? null : metricsData.getAvgProcessLatency1Min());
 
                     // Filter out values that are NaN
                     Map<String, Double> statsDataMap = metricsData.getUserMetricsMap().entrySet().stream()
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ea23d26..2961afe 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -36,6 +36,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -128,6 +129,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
                                      final @PathParam("namespace") String namespace,
@@ -144,6 +146,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final
@PathParam("tenant") String tenant,
                                                                                         
         final @PathParam("namespace") String namespace,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f6f7b1..0bfeb90 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 
@@ -42,6 +43,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -686,6 +688,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         // get function info
         getFunctionInfoSuccess(functionName);
 
+        // get function stats
+        getFunctionStatsEmpty(functionName);
+
         // publish and consume result
         if (Runtime.JAVA == runtime) {
             // java supports schema
@@ -698,6 +703,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         // get function status
         getFunctionStatus(functionName, numMessages);
 
+        // get function stats
+        getFunctionStats(functionName, numMessages);
+
         // delete function
         deleteFunction(functionName);
 
@@ -800,6 +808,86 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
     }
 
+    private static void getFunctionStatsEmpty(String functionName) throws Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "stats",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
+        );
+
+        log.info("FUNCTION STATS: {}", result.getStdout());
+        FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+
+        assertEquals(functionStats.getReceivedTotal(), 0);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.avgProcessLatency, null);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
+        assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
+        assertEquals(functionStats.getLastInvocation(), null);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, null);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(),
null);
+    }
+
+    private static void getFunctionStats(String functionName, int numMessages) throws Exception
{
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "stats",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
+        );
+
+        log.info("FUNCTION STATS: {}", result.getStdout());
+
+        FunctionStats functionStats = FunctionStats.decode(result.getStdout());
+        assertEquals(functionStats.getReceivedTotal(), numMessages);
+        assertEquals(functionStats.getProcessedSuccessfullyTotal(), numMessages);
+        assertEquals(functionStats.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.avgProcessLatency > 0);
+        assertEquals(functionStats.oneMin.getReceivedTotal(), numMessages);
+        assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), numMessages);
+        assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
+        assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
+        assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
+        assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
+        assertTrue(functionStats.getLastInvocation() > 0);
+
+        assertEquals(functionStats.instances.size(), 1);
+        assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), numMessages);
+        assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(),
numMessages);
+        assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
+        assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(),
numMessages);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(),
numMessages);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(),
0);
+        assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(),
0);
+        assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency()
> 0);
+    }
+
     private static void getFunctionInfoNotFound(String functionName) throws Exception {
         try {
             pulsarCluster.getAnyWorker().execCmd(


Mime
View raw message