pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #3060: Make sure to properly count number of processed messages in python
Date Tue, 27 Nov 2018 18:51:30 GMT
srkukarni closed pull request #3060: Make sure to properly count number of processed messages
in python
URL: https://github.com/apache/pulsar/pull/3060
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f9491f348e..9214d253db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -525,10 +525,10 @@ public void testPulsarFunctionStatus() throws Exception {
 
         FunctionStatus stats = functionStatus.getFunctionStatusListList().get(0);
 
-        double count = stats.getNumProcessed();
+        double count = stats.getNumReceived();
         double success = stats.getNumSuccessfullyProcessed();
         String ownerWorkerId = stats.getWorkerId();
-        assertEquals((int) count, totalMsgs);
+        assertEquals((int)count, totalMsgs);
         assertEquals((int) success, totalMsgs);
         assertEquals(ownerWorkerId, workerId);
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 4d664223cc..139208b77f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -46,7 +46,6 @@
     public final static String USER_METRIC_PREFIX = "user_metric_";
 
     /** Declare metric names **/
-    public static final String PROCESSED_TOTAL = "processed_total";
     public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
     public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
     public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
@@ -54,7 +53,6 @@
     public static final String LAST_INVOCATION = "last_invocation";
     public static final String RECEIVED_TOTAL = "received_total";
 
-    public static final String PROCESSED_TOTAL_1min = "processed_total_1min";
     public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = "processed_successfully_total_1min";
     public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min";
     public static final String USER_EXCEPTIONS_TOTAL_1min = "user_exceptions_total_1min";
@@ -63,8 +61,6 @@
 
     /** Declare Prometheus stats **/
 
-    final Counter statTotalProcessed;
-
     final Counter statTotalProcessedSuccessfully;
 
     final Counter statTotalSysExceptions;
@@ -79,8 +75,6 @@
     
     // windowed metrics
 
-    final Counter statTotalProcessed1min;
-
     final Counter statTotalProcessedSuccessfully1min;
 
     final Counter statTotalSysExceptions1min;
@@ -104,12 +98,6 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, String[]
metric
 
         this.metricsLabels = metricsLabels;
 
-        statTotalProcessed = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
-                .help("Total number of messages processed.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-
         statTotalProcessedSuccessfully = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
                 .help("Total number of messages processed successfully.")
@@ -150,12 +138,6 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, String[]
metric
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
-        statTotalProcessed1min = Counter.build()
-                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL_1min)
-                .help("Total number of messages processed in the last 1 minute.")
-                .labelNames(metricsLabelNames)
-                .register(collectorRegistry);
-
         statTotalProcessedSuccessfully1min = Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
                 .help("Total number of messages processed successfully in the last 1 minute.")
@@ -222,11 +204,6 @@ public void incrTotalReceived() {
         statTotalRecordsRecieved1min.labels(metricsLabels).inc();
     }
 
-    public void incrTotalProcessed() {
-        statTotalProcessed.labels(metricsLabels).inc();
-        statTotalProcessed1min.labels(metricsLabels).inc();
-    }
-
     public void incrTotalProcessedSuccessfully() {
         statTotalProcessedSuccessfully.labels(metricsLabels).inc();
         statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
@@ -261,10 +238,6 @@ public void processTimeEnd() {
         }
     }
 
-    public double getTotalProcessed() {
-        return statTotalProcessed.labels(metricsLabels).get();
-    }
-
     public double getTotalProcessedSuccessfully() {
         return statTotalProcessedSuccessfully.labels(metricsLabels).get();
     }
@@ -306,10 +279,6 @@ public double getProcessLatency99_9P() {
         return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
     }
 
-    public double getTotalProcessed1min() {
-        return statTotalProcessed1min.labels(metricsLabels).get();
-    }
-
     public double getTotalProcessedSuccessfully1min() {
         return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
     }
@@ -348,7 +317,6 @@ public double getProcessLatency99_9P1min() {
     }
 
     public void reset() {
-        statTotalProcessed1min.clear();
         statTotalProcessedSuccessfully1min.clear();
         statTotalSysExceptions1min.clear();
         statTotalUserExceptions1min.clear();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 72dbb9f574..1d70356077 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -255,8 +255,6 @@ public void run() {
 
                 // register end time
                 stats.processTimeEnd();
-                // increment total processed
-                stats.incrTotalProcessed();
 
                 removeLogTopicHandler();
 
@@ -520,7 +518,6 @@ public void resetMetrics() {
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
 
-        bldr.setProcessedTotal((long) stats.getTotalProcessed());
         bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully());
         bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
         bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
@@ -528,7 +525,6 @@ private Builder createMetricsDataBuilder() {
         bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
         bldr.setLastInvocation((long) stats.getLastInvocation());
 
-        bldr.setProcessedTotal1Min((long) stats.getTotalProcessed1min());
         bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min());
         bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min());
         bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min());
@@ -540,7 +536,7 @@ private Builder createMetricsDataBuilder() {
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
-        functionStatusBuilder.setNumProcessed((long) stats.getTotalProcessed());
+        functionStatusBuilder.setNumReceived((long)stats.getTotalRecordsReceived());
         functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
         functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
         stats.getLatestUserExceptions().forEach(ex -> {
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index d6cc8af085..3979d490b7 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -17,7 +17,6 @@
 # under the License.
 #
 
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index f88e6a3e78..2f7b376c65 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08
\x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n
\x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b
\x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r
\x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01
\x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"V\n\x12\x46unctionStatusList\x12\r\n\x05\x65rror\x18\x02
\x01(\t\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\x85\x04\n\x0bMetricsData\x12\x15\n\rreceivedTotal\x18\x02
\x01(\x03\x12\x1a\n\x12receivedTotal_1min\x18\n \x01(\x03\x12\x16\n\x0eprocessedTotal\x18\x03
\x01(\x03\x12\x1b\n\x13processedTotal_1min\x18\x0b \x01(\x03\x12\"\n\x1aprocessedSuccessfullyTotal\x18\x04
\x01(\x03\x12\'\n\x1fprocessedSuccessfullyTotal_1min\x18\x0c \x01(\x03\x12\x1d\n\x15systemExceptionsTotal\x18\x05
\x01(\x03\x12\"\n\x1asystemExceptionsTotal_1min\x18\r \x01(\x03\x12\x1b\n\x13userExceptionsTotal\x18\x06
\x01(\x03\x12 \n\x18userExceptionsTotal_1min\x18\x0e \x01(\x03\x12\x19\n\x11\x61vgProcessLatency\x18\x07
\x01(\x01\x12\x1e\n\x16\x61vgProcessLatency_1min\x18\x0f \x01(\x01\x12\x16\n\x0elastInvocation\x18\x08
\x01(\x03\x12\x38\n\x0buserMetrics\x18\t \x03(\x0b\x32#.proto.MetricsData.UserMetricsEntry\x1a\x32\n\x10UserMetricsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01
\x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01
\x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
+  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb2\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08
\x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n
\x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b
\x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r
\x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01
\x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"V\n\x12\x46unctionStatusList\x12\r\n\x05\x65rror\x18\x02
\x01(\t\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd0\x03\n\x0bMetricsData\x12\x15\n\rreceivedTotal\x18\x02
\x01(\x03\x12\x1a\n\x12receivedTotal_1min\x18\n \x01(\x03\x12\"\n\x1aprocessedSuccessfullyTotal\x18\x04
\x01(\x03\x12\'\n\x1fprocessedSuccessfullyTotal_1min\x18\x0c \x01(\x03\x12\x1d\n\x15systemExceptionsTotal\x18\x05
\x01(\x03\x12\"\n\x1asystemExceptionsTotal_1min\x18\r \x01(\x03\x12\x1b\n\x13userExceptionsTotal\x18\x06
\x01(\x03\x12 \n\x18userExceptionsTotal_1min\x18\x0e \x01(\x03\x12\x19\n\x11\x61vgProcessLatency\x18\x07
\x01(\x01\x12\x1e\n\x16\x61vgProcessLatency_1min\x18\x0f \x01(\x01\x12\x16\n\x0elastInvocation\x18\x08
\x01(\x03\x12\x38\n\x0buserMetrics\x18\t \x03(\x0b\x32#.proto.MetricsData.UserMetricsEntry\x1a\x32\n\x10UserMetricsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01
\x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01
\x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -79,8 +79,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=624,
-  serialized_end=693,
+  serialized_start=623,
+  serialized_end=692,
 )
 
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=695,
-  serialized_end=759,
+  serialized_start=694,
+  serialized_end=758,
 )
 
 _FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -149,8 +149,8 @@
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='numProcessed', full_name='proto.FunctionStatus.numProcessed', index=3,
-      number=4, type=3, cpp_type=2, label=1,
+      name='numReceived', full_name='proto.FunctionStatus.numReceived', index=3,
+      number=17, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -245,7 +245,7 @@
   oneofs=[
   ],
   serialized_start=68,
-  serialized_end=759,
+  serialized_end=758,
 )
 
 
@@ -282,8 +282,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=761,
-  serialized_end=847,
+  serialized_start=760,
+  serialized_end=846,
 )
 
 
@@ -320,8 +320,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1317,
-  serialized_end=1367,
+  serialized_start=1263,
+  serialized_end=1313,
 )
 
 _METRICSDATA = _descriptor.Descriptor(
@@ -346,84 +346,70 @@
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='processedTotal', full_name='proto.MetricsData.processedTotal', index=2,
-      number=3, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='processedTotal_1min', full_name='proto.MetricsData.processedTotal_1min', index=3,
-      number=11, type=3, cpp_type=2, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal',
index=4,
+      name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal',
index=2,
       number=4, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='processedSuccessfullyTotal_1min', full_name='proto.MetricsData.processedSuccessfullyTotal_1min',
index=5,
+      name='processedSuccessfullyTotal_1min', full_name='proto.MetricsData.processedSuccessfullyTotal_1min',
index=3,
       number=12, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal',
index=6,
+      name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal',
index=4,
       number=5, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='systemExceptionsTotal_1min', full_name='proto.MetricsData.systemExceptionsTotal_1min',
index=7,
+      name='systemExceptionsTotal_1min', full_name='proto.MetricsData.systemExceptionsTotal_1min',
index=5,
       number=13, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=8,
+      name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=6,
       number=6, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userExceptionsTotal_1min', full_name='proto.MetricsData.userExceptionsTotal_1min',
index=9,
+      name='userExceptionsTotal_1min', full_name='proto.MetricsData.userExceptionsTotal_1min',
index=7,
       number=14, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=10,
+      name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=8,
       number=7, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='avgProcessLatency_1min', full_name='proto.MetricsData.avgProcessLatency_1min',
index=11,
+      name='avgProcessLatency_1min', full_name='proto.MetricsData.avgProcessLatency_1min',
index=9,
       number=15, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=12,
+      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=10,
       number=8, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=13,
+      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=11,
       number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -441,8 +427,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=850,
-  serialized_end=1367,
+  serialized_start=849,
+  serialized_end=1313,
 )
 
 
@@ -472,8 +458,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1369,
-  serialized_end=1405,
+  serialized_start=1315,
+  serialized_end=1351,
 )
 
 
@@ -517,8 +503,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1468,
-  serialized_end=1560,
+  serialized_start=1414,
+  serialized_end=1506,
 )
 
 _METRICS = _descriptor.Descriptor(
@@ -547,8 +533,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1408,
-  serialized_end=1560,
+  serialized_start=1354,
+  serialized_end=1506,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -650,8 +636,8 @@
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1563,
-  serialized_end=1911,
+  serialized_start=1509,
+  serialized_end=1857,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index f720a41fe1..3d6e216a04 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -30,7 +30,6 @@ class Stats(object):
   PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
   USER_METRIC_PREFIX = "user_metric_";
 
-  TOTAL_PROCESSED = 'processed_total'
   TOTAL_SUCCESSFULLY_PROCESSED = 'processed_successfully_total'
   TOTAL_SYSTEM_EXCEPTIONS = 'system_exceptions_total'
   TOTAL_USER_EXCEPTIONS = 'user_exceptions_total'
@@ -38,7 +37,6 @@ class Stats(object):
   LAST_INVOCATION = 'last_invocation'
   TOTAL_RECEIVED = 'received_total'
 
-  TOTAL_PROCESSED_1min = 'processed_total_1min'
   TOTAL_SUCCESSFULLY_PROCESSED_1min = 'processed_successfully_total_1min'
   TOTAL_SYSTEM_EXCEPTIONS_1min = 'system_exceptions_total_1min'
   TOTAL_USER_EXCEPTIONS_1min = 'user_exceptions_total_1min'
@@ -46,7 +44,6 @@ class Stats(object):
   TOTAL_RECEIVED_1min = 'received_total_1min'
 
   # Declare Prometheus
-  stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED, 'Total
number of messages processed.', metrics_label_names)
   stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED,
                                               'Total number of messages processed successfully.',
metrics_label_names)
   stat_total_sys_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX+ TOTAL_SYSTEM_EXCEPTIONS,
'Total number of system exceptions.',
@@ -62,8 +59,6 @@ class Stats(object):
 
 
   # 1min windowed metrics
-  stat_total_processed_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED_1min,
-                                 'Total number of messages processed in the last 1 minute.',
metrics_label_names)
   stat_total_processed_successfully_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED_1min,
                                               'Total number of messages processed successfully
in the last 1 minute.', metrics_label_names)
   stat_total_sys_exceptions_1min = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SYSTEM_EXCEPTIONS_1min,
@@ -92,9 +87,6 @@ def __init__(self, metrics_labels):
   def get_total_received(self):
     return self.stat_total_received.labels(*self.metrics_labels)._value.get();
 
-  def get_total_processed(self):
-    return self.stat_total_processed.labels(*self.metrics_labels)._value.get();
-
   def get_total_processed_successfully(self):
     return self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
 
@@ -111,20 +103,17 @@ def get_avg_process_latency(self):
       if process_latency_ms_count <= 0.0 \
       else process_latency_ms_sum / process_latency_ms_count
 
-  def get_total_received_1min(self):
-    return self.stat_total_received_1min.labels(*self.metrics_labels)._value.get();
-
-  def get_total_processed_1min(self):
-    return self.stat_total_processed_1min.labels(*self.metrics_labels)._value.get();
-
   def get_total_processed_successfully_1min(self):
-    return self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get();
+    return self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get()
 
   def get_total_sys_exceptions_1min(self):
-    return self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get();
+    return self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get()
 
   def get_total_user_exceptions_1min(self):
-    return self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get();
+    return self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get()
+
+  def get_total_received_1min(self):
+    return self.stat_total_received_1min.labels(*self.metrics_labels)._value.get()
 
   def get_avg_process_latency_1min(self):
     process_latency_ms_count = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
@@ -136,10 +125,6 @@ def get_avg_process_latency_1min(self):
   def get_last_invocation(self):
     return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
 
-  def incr_total_processed(self):
-    self.stat_total_processed.labels(*self.metrics_labels).inc()
-    self.stat_total_processed_1min.labels(*self.metrics_labels).inc()
-
   def incr_total_processed_successfully(self):
     self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
     self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
@@ -183,7 +168,6 @@ def add_sys_exception(self):
   def reset(self):
     self.latest_user_exception = []
     self.latest_sys_exception = []
-    self.stat_total_processed_1min.labels(*self.metrics_labels)._value.set(0.0)
     self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
     self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
     self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index ec85af1811..0fc3601a6b 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -211,12 +211,9 @@ def actual_execution(self):
 
           # stop timer for process time
           self.stats.process_time_end()
-
-          # incr total processed stat
-          self.stats.incr_total_processed()
         except Exception as e:
           Log.exception("Exception while executing user method")
-          self.stats.incr_total_user_exceptions();
+          self.stats.incr_total_user_exceptions()
 
         if self.log_topic_handler is not None:
           log.remove_all_handlers()
@@ -292,7 +289,6 @@ def reset_metrics(self):
   def get_metrics(self):
 
     total_received =  self.stats.get_total_received()
-    total_processed = self.stats.get_total_processed()
     total_processed_successfully = self.stats.get_total_processed_successfully()
     total_user_exceptions = self.stats.get_total_user_exceptions()
     total_sys_exceptions = self.stats.get_total_sys_exceptions()
@@ -300,7 +296,6 @@ def get_metrics(self):
     last_invocation = self.stats.get_last_invocation()
 
     total_received_1min = self.stats.get_total_received_1min()
-    total_processed_1min = self.stats.get_total_processed_1min()
     total_processed_successfully_1min = self.stats.get_total_processed_successfully_1min()
     total_user_exceptions_1min = self.stats.get_total_user_exceptions_1min()
     total_sys_exceptions_1min = self.stats.get_total_sys_exceptions_1min()
@@ -309,7 +304,6 @@ def get_metrics(self):
     metrics_data = InstanceCommunication_pb2.MetricsData()
     # total metrics
     metrics_data.receivedTotal = int(total_received) if sys.version_info.major >= 3 else
long(total_received)
-    metrics_data.processedTotal = int(total_processed) if sys.version_info.major >= 3
else long(total_processed)
     metrics_data.processedSuccessfullyTotal = int(total_processed_successfully) if sys.version_info.major
>= 3 else long(total_processed_successfully)
     metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if sys.version_info.major
>= 3 else long(total_sys_exceptions)
     metrics_data.userExceptionsTotal = int(total_user_exceptions) if sys.version_info.major
>= 3 else long(total_user_exceptions)
@@ -317,7 +311,6 @@ def get_metrics(self):
     metrics_data.lastInvocation = int(last_invocation) if sys.version_info.major >= 3
else long(last_invocation)
     # 1min metrics
     metrics_data.receivedTotal_1min = int(total_received_1min) if sys.version_info.major
>= 3 else long(total_received_1min)
-    metrics_data.processedTotal_1min = int(total_processed_1min) if sys.version_info.major
>= 3 else long(total_processed_1min)
     metrics_data.processedSuccessfullyTotal_1min = int(
       total_processed_successfully_1min) if sys.version_info.major >= 3 else long(total_processed_successfully_1min)
     metrics_data.systemExceptionsTotal_1min = int(total_sys_exceptions_1min) if sys.version_info.major
>= 3 else long(
@@ -343,14 +336,14 @@ def get_function_status(self):
     status = InstanceCommunication_pb2.FunctionStatus()
     status.running = True
 
-    total_processed = self.stats.get_total_processed()
+    total_received = self.stats.get_total_received()
     total_processed_successfully = self.stats.get_total_processed_successfully()
     total_user_exceptions = self.stats.get_total_user_exceptions()
     total_sys_exceptions = self.stats.get_total_sys_exceptions()
     avg_process_latency_ms = self.stats.get_avg_process_latency()
     last_invocation = self.stats.get_last_invocation()
 
-    status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
+    status.numReceived = int(total_received) if sys.version_info.major >= 3 else long(total_received)
     status.numSuccessfullyProcessed = int(total_processed_successfully) if sys.version_info.major
>= 3 else long(total_processed_successfully)
     status.numUserExceptions = int(total_user_exceptions) if sys.version_info.major >=
3 else long(total_user_exceptions)
     status.instanceId = self.instance_config.instance_id
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 05d13992fd..e5359f8241 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -32,7 +32,8 @@ message FunctionStatus {
     bool running = 1;
     string failureException = 2;
     int64 numRestarts = 3;
-    int64 numProcessed = 4;
+    // int64 numProcessed = 4;
+    int64 numReceived = 17;
     int64 numSuccessfullyProcessed = 5;
     int64 numUserExceptions = 6;
     repeated ExceptionInformation latestUserExceptions = 7;
@@ -74,9 +75,10 @@ message MetricsData {
     int64 receivedTotal_1min = 10;
 
     // Total number of records processed
-    int64 processedTotal = 3;
+    // No longer used because processedSuccessfullyTotal and userExceptionsTotal add to it
+    // int64 processedTotal = 3;
 
-    int64 processedTotal_1min = 11;
+    // int64 processedTotal_1min = 11;
 
     // Total number of records successfully processed by user function
     int64 processedSuccessfullyTotal = 4;
@@ -124,4 +126,4 @@ message Metrics {
 		MetricsData metricsData = 3;
 	}
 	repeated InstanceMetrics metrics = 1;
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 77228ac224..2b8e0fd595 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -71,7 +71,6 @@ public static void generate(WorkerService workerService, String cluster,
SimpleT
                             metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, metrics.getAvgProcessLatency());
                             metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
                             metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, metrics.getProcessedSuccessfullyTotal());
-                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.PROCESSED_TOTAL, instanceId, metrics.getProcessedTotal());
                             metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
                             metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, metrics.getSystemExceptionsTotal());
                             metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX
+ FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, metrics.getUserExceptionsTotal());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index efc2974e5a..4b54bcfc98 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -88,7 +88,6 @@ public void testFunctionsStatsGenerate() {
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture
= new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
                 .setReceivedTotal(101)
-                .setProcessedTotal(100)
                 .setProcessedSuccessfullyTotal(99)
                 .setAvgProcessLatency(10.0)
                 .setUserExceptionsTotal(3)
@@ -126,7 +125,7 @@ public void testFunctionsStatsGenerate() {
         buf.release();
         Map<String, Metric> metrics = parseMetrics(str);
 
-        Assert.assertEquals(metrics.size(), 7);
+        Assert.assertEquals(metrics.size(), 6);
 
         System.out.println("metrics: " + metrics);
         Metric m = metrics.get("pulsar_function_received_total");
@@ -136,13 +135,6 @@ public void testFunctionsStatsGenerate() {
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 101.0);
 
-        m = metrics.get("pulsar_function_processed_total");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 100.0);
-
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0bfeb90882..e72963ecd2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -333,7 +333,7 @@ protected void waitForProcessingMessages(String tenant,
             try {
                 ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""))
{
+                if (result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\"")) {
                     return;
                 }
             } catch (ContainerExecException e) {
@@ -532,7 +532,7 @@ protected void waitForProcessingSourceMessages(String tenant,
             try {
                 ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get source status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""))
{
+                if (result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\"")) {
                     return;
                 }
             } catch (ContainerExecException e) {
@@ -561,7 +561,7 @@ protected void waitForProcessingSinkMessages(String tenant,
             try {
                 ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
                 log.info("Get sink status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""))
{
+                if (result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\"")) {
                     return;
                 }
             } catch (ContainerExecException e) {
@@ -913,7 +913,6 @@ private static void getFunctionStatus(String functionName, int numMessages)
thro
             "--name", functionName
         );
         assertTrue(result.getStdout().contains("\"running\": true"));
-        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""));
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\""));
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message