pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #2128: Schedule task to update function stats separately
Date Thu, 12 Jul 2018 21:54:51 GMT
rdhabalia closed pull request #2128: Schedule task to update function stats separately
URL: https://github.com/apache/incubator-pulsar/pull/2128
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 73edbfbb7c..58bcf1dfd3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,3 +44,4 @@ rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
+metricsSamplingPeriodSec: 60
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 0a1b96503c..b41d6f48ab 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -91,6 +91,7 @@ public void update(double value) {
         }
     }
 
+    private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics;
     private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
 
     private Map<String, Producer> publishProducers;
@@ -110,6 +111,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client,
         this.logger = logger;
         this.pulsarClient = client;
         this.classLoader = classLoader;
+        this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.publishSerializers = new HashMap<>();
@@ -324,11 +326,23 @@ public ByteBuffer getState(String key) {
 
     @Override
     public void recordMetric(String metricName, double value) {
-        accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
-        accumulatedMetrics.get(metricName).update(value);
+        currentAccumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
+        currentAccumulatedMetrics.get(metricName).update(value);
     }
 
     public MetricsData getAndResetMetrics() {
+        MetricsData retval = getMetrics();
+        resetMetrics();
+        return retval;
+    }
+
+    public void resetMetrics() {
+        this.accumulatedMetrics.clear();
+        this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
+        this.currentAccumulatedMetrics.clear();
+    }
+    
+    public MetricsData getMetrics() {
         MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
         for (String metricName : accumulatedMetrics.keySet()) {
             MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder();
@@ -339,7 +353,6 @@ public MetricsData getAndResetMetrics() {
             metricsDataBuilder.putMetrics(metricName, bldr.build());
         }
         MetricsData retval = metricsDataBuilder.build();
-        accumulatedMetrics.clear();
         return retval;
     }
 }
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index 899777c46f..c45837bafd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -102,13 +102,35 @@ public double computeLatency() {
                 return totalLatencyMs / totalSuccessfullyProcessed;
             }
         }
+        
+        public void update(Stats stats) {
+            if (stats == null) {
+                return;
+            }
+            this.totalProcessed = stats.totalProcessed;
+            this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed;
+            this.totalUserExceptions = stats.totalUserExceptions;
+            this.latestUserExceptions.clear();
+            this.latestSystemExceptions.clear();
+            this.totalDeserializationExceptions.clear();
+            this.latestUserExceptions.addAll(stats.latestUserExceptions);
+            this.latestSystemExceptions.addAll(stats.latestSystemExceptions);
+            this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions);
+            this.totalSystemExceptions = stats.totalSystemExceptions;
+            this.latestSystemExceptions = stats.latestSystemExceptions;
+            this.totalSerializationExceptions = stats.totalSerializationExceptions;
+            this.totalLatencyMs = stats.totalLatencyMs;
+            this.lastInvocationTime = stats.lastInvocationTime;
+        }
     }
 
     private Stats currentStats;
     private Stats totalStats;
-
+    private Stats stats;
+    
     public FunctionStats() {
         currentStats = new Stats();
+        stats = new Stats();
         totalStats = new Stats();
     }
 
@@ -138,6 +160,7 @@ public void incrementSerializationExceptions() {
         totalStats.incrementSerializationExceptions();
     }
     public void resetCurrent() {
+        stats.update(currentStats);
         currentStats.reset();
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 496b43f109..42ade3ffd1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -76,4 +76,12 @@ public void close() {
     public InstanceCommunication.MetricsData getAndResetMetrics() {
         return context.getAndResetMetrics();
     }
+
+    public void resetMetrics() {
+        context.resetMetrics();
+    }
+
+    public InstanceCommunication.MetricsData getMetrics() {
+        return context.getMetrics();
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d2f832d386..ba0de385b2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -52,9 +52,9 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.sink.PulsarSink;
@@ -389,16 +389,7 @@ public void close() {
     }
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {
-        InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics("__total_processed__", stats.getCurrentStats().getTotalProcessed(),
bldr);
-        addSystemMetrics("__total_successfully_processed__", stats.getCurrentStats().getTotalSuccessfullyProcessed(),
bldr);
-        addSystemMetrics("__total_system_exceptions__", stats.getCurrentStats().getTotalSystemExceptions(),
bldr);
-        addSystemMetrics("__total_user_exceptions__", stats.getCurrentStats().getTotalUserExceptions(),
bldr);
-        stats.getCurrentStats().getTotalDeserializationExceptions().forEach((topic, count)
-> {
-            addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr);
-        });
-        addSystemMetrics("__total_serialization_exceptions__", stats.getCurrentStats().getTotalSerializationExceptions(),
bldr);
-        addSystemMetrics("__avg_latency_ms__", stats.getCurrentStats().computeLatency(),
bldr);
+        InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
         stats.resetCurrent();
         if (javaInstance != null) {
             InstanceCommunication.MetricsData userMetrics =  javaInstance.getAndResetMetrics();
@@ -409,6 +400,38 @@ public void close() {
         return bldr.build();
     }
 
+    public InstanceCommunication.MetricsData getMetrics() {
+        InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
+        if (javaInstance != null) {
+            InstanceCommunication.MetricsData userMetrics =  javaInstance.getMetrics();
+            if (userMetrics != null) {
+                bldr.putAllMetrics(userMetrics.getMetricsMap());
+            }
+        }
+        return bldr.build();
+    }
+
+    public void resetMetrics() {
+        stats.resetCurrent();
+        javaInstance.resetMetrics();
+    }
+    
+    private Builder createMetricsDataBuilder() {
+        InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
+        addSystemMetrics("__total_processed__", stats.getStats().getTotalProcessed(), bldr);
+        addSystemMetrics("__total_successfully_processed__", stats.getStats().getTotalSuccessfullyProcessed(),
+                bldr);
+        addSystemMetrics("__total_system_exceptions__", stats.getStats().getTotalSystemExceptions(),
bldr);
+        addSystemMetrics("__total_user_exceptions__", stats.getStats().getTotalUserExceptions(),
bldr);
+        stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) ->
{
+            addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr);
+        });
+        addSystemMetrics("__total_serialization_exceptions__",
+                stats.getStats().getTotalSerializationExceptions(), bldr);
+        addSystemMetrics("__avg_latency_ms__", stats.getStats().computeLatency(), bldr);
+        return bldr;
+    }
+
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
         functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed());
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index b4e814dd54..49c77d61a3 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08
\x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n
\x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b
\x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r
\x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01
\x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01
\x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01
\x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01
\x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04
\x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02
\x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01
\x01(\x08\x32\xde\x01\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
+  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
\x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07
\x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08
\x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n
\x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b
\x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r
\x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01
\x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01
\x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01
\x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01
\x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04
\x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02
\x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01
\x01(\x08\x32\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -513,7 +513,7 @@
   index=0,
   options=None,
   serialized_start=1068,
-  serialized_end=1290,
+  serialized_end=1416,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
@@ -533,10 +533,28 @@
     output_type=_METRICSDATA,
     options=None,
   ),
+  _descriptor.MethodDescriptor(
+    name='ResetMetrics',
+    full_name='proto.InstanceControl.ResetMetrics',
+    index=2,
+    containing_service=None,
+    input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    output_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    options=None,
+  ),
+  _descriptor.MethodDescriptor(
+    name='GetMetrics',
+    full_name='proto.InstanceControl.GetMetrics',
+    index=3,
+    containing_service=None,
+    input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    output_type=_METRICSDATA,
+    options=None,
+  ),
   _descriptor.MethodDescriptor(
     name='HealthCheck',
     full_name='proto.InstanceControl.HealthCheck',
-    index=2,
+    index=4,
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=_HEALTHCHECKRESULT,
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
index 575f994641..21730e1b52 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
@@ -44,6 +44,16 @@ def __init__(self, channel):
         request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
         response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
         )
+    self.ResetMetrics = channel.unary_unary(
+        '/proto.InstanceControl/ResetMetrics',
+        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+        response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+        )
+    self.GetMetrics = channel.unary_unary(
+        '/proto.InstanceControl/GetMetrics',
+        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+        response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+        )
     self.HealthCheck = channel.unary_unary(
         '/proto.InstanceControl/HealthCheck',
         request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
@@ -69,6 +79,20 @@ def GetAndResetMetrics(self, request, context):
     context.set_details('Method not implemented!')
     raise NotImplementedError('Method not implemented!')
 
+  def ResetMetrics(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def GetMetrics(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
   def HealthCheck(self, request, context):
     # missing associated documentation comment in .proto file
     pass
@@ -89,6 +113,16 @@ def add_InstanceControlServicer_to_server(servicer, server):
           request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
           response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
       ),
+      'ResetMetrics': grpc.unary_unary_rpc_method_handler(
+          servicer.ResetMetrics,
+          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+          response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+      ),
+      'GetMetrics': grpc.unary_unary_rpc_method_handler(
+          servicer.GetMetrics,
+          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+          response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+      ),
       'HealthCheck': grpc.unary_unary_rpc_method_handler(
           servicer.HealthCheck,
           request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 35a5443b9e..913ea33d1d 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -54,6 +54,7 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers)
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
+    self.current_accumulated_metrics = {}
     self.accumulated_metrics = {}
     self.publish_producers = {}
     self.publish_serializers = {}
@@ -107,9 +108,9 @@ def get_user_config_map(self):
     return self.user_config
 
   def record_metric(self, metric_name, metric_value):
-    if not metric_name in self.accumulated_metrics:
-      self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
-    self.accumulated_metrics[metric_name].update(metric_value)
+    if not metric_name in self.current_accumulated_metrics:
+      self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
+    self.current_accumulated_metrics[metric_name].update(metric_value)
 
   def get_output_topic(self):
     return self.instance_config.function_details.output
@@ -146,6 +147,18 @@ def ack(self, msgid, topic):
     self.consumers[topic].acknowledge(msgid)
 
   def get_and_reset_metrics(self):
+    metrics = self.get_metrics()
+    # TODO(sanjeev):- Make this thread safe
+    self.reset_metrics()
+    return metrics
+
+  def reset_metrics(self):
+    # TODO: Make it thread safe
+    self.accumulated_metrics.clear()
+    self.accumulated_metrics.update(self.current_accumulated_metrics)
+    self.current_accumulated_metrics.clear()
+
+  def get_metrics(self):
     metrics = InstanceCommunication_pb2.MetricsData()
     for metric_name, accumulated_metric in self.accumulated_metrics.items():
       m = InstanceCommunication_pb2.MetricsData.DataDigest()
@@ -154,6 +167,4 @@ def get_and_reset_metrics(self):
       m.max = accumulated_metric.max
       m.min = accumulated_metric.min
       metrics.metrics[metric_name] = m
-    # TODO(sanjeev):- Make this thread safe
-    self.accumulated_metrics.clear()
-    return metrics
+    return metrics
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 71202498d9..70b78f0107 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -97,6 +97,22 @@ def compute_latency(self):
     else:
       return self.latency / self.nsuccessfullyprocessed
 
+  def update(self, object):
+    self.nprocessed = object.nprocessed
+    self.nsuccessfullyprocessed = object.nsuccessfullyprocessed
+    self.nuserexceptions = object.nuserexceptions
+    self.nsystemexceptions = object.nsystemexceptions
+    self.nserialization_exceptions = object.nserialization_exceptions
+    self.latency = object.latency
+    self.lastinvocationtime = object.lastinvocationtime
+    self.latestuserexceptions = []
+    self.latestsystemexceptions = []
+    self.ndeserialization_exceptions.clear()
+    self.latestuserexceptions.append(object.latestuserexceptions)
+    self.latestsystemexceptions.append(object.latestsystemexceptions)
+    self.ndeserialization_exceptions.update(object.ndeserialization_exceptions)
+    
+
 class PythonInstance(object):
   def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples,
user_code, log_topic, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details,
max_buffered_tuples)
@@ -119,6 +135,7 @@ def __init__(self, instance_id, function_id, function_version, function_details,
     self.contextimpl = None
     self.total_stats = Stats()
     self.current_stats = Stats()
+    self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs
> 0 else None
 
@@ -278,19 +295,30 @@ def message_listener(self, topic, serde, consumer, message):
 
   def get_and_reset_metrics(self):
     # First get any user metrics
-    metrics = self.contextimpl.get_and_reset_metrics()
+    metrics = self.get_metrics()
+    self.reset_metrics()
+    return metrics
+
+  def reset_metrics(self):
+    self.stats.update(self.current_stats)
+    self.current_stats.reset()
+    self.contextimpl.reset_metrics()
+
+  def get_metrics(self):
+    # First get any user metrics
+    metrics = self.contextimpl.get_metrics()
     # Now add system metrics as well
-    self.add_system_metrics("__total_processed__", self.current_stats.nprocessed, metrics)
-    self.add_system_metrics("__total_successfully_processed__", self.current_stats.nsuccessfullyprocessed,
metrics)
-    self.add_system_metrics("__total_system_exceptions__", self.current_stats.nsystemexceptions,
metrics)
-    self.add_system_metrics("__total_user_exceptions__", self.current_stats.nuserexceptions,
metrics)
-    for (topic, metric) in self.current_stats.ndeserialization_exceptions.items():
+    self.add_system_metrics("__total_processed__", self.stats.nprocessed, metrics)
+    self.add_system_metrics("__total_successfully_processed__", self.stats.nsuccessfullyprocessed,
metrics)
+    self.add_system_metrics("__total_system_exceptions__", self.stats.nsystemexceptions,
metrics)
+    self.add_system_metrics("__total_user_exceptions__", self.stats.nuserexceptions, metrics)
+    for (topic, metric) in self.stats.ndeserialization_exceptions.items():
       self.add_system_metrics("__total_deserialization_exceptions__" + topic, metric, metrics)
-    self.add_system_metrics("__total_serialization_exceptions__", self.current_stats.nserialization_exceptions,
metrics)
-    self.add_system_metrics("__avg_latency_ms__", self.current_stats.compute_latency(), metrics)
-    self.current_stats.reset()
+    self.add_system_metrics("__total_serialization_exceptions__", self.stats.nserialization_exceptions,
metrics)
+    self.add_system_metrics("__avg_latency_ms__", self.stats.compute_latency(), metrics)
     return metrics
 
+
   def add_system_metrics(self, metric_name, value, metrics):
     metrics.metrics[metric_name].count = value
     metrics.metrics[metric_name].sum = value
diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py
index 193e6ab928..611a737031 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -42,6 +42,15 @@ def GetAndResetMetrics(self, request, context):
     Log.info("Came in GetAndResetMetrics")
     return self.pyinstance.get_and_reset_metrics()
 
+  def ResetMetrics(self, request, context):
+    Log.info("Came in ResetMetrics")
+    self.pyinstance.reset_metrics()
+    return request
+
+  def GetMetrics(self, request, context):
+    Log.info("Came in GetMetrics")
+    return self.pyinstance.get_metrics()
+
   def HealthCheck(self, request, context):
     return self.pyinstance.health_check()
 
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 8db742b641..ed8c95a7e9 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -72,5 +72,7 @@ message HealthCheckResult {
 service InstanceControl {
     rpc GetFunctionStatus(google.protobuf.Empty) returns (FunctionStatus) {}
     rpc GetAndResetMetrics(google.protobuf.Empty) returns (MetricsData) {}
+    rpc ResetMetrics(google.protobuf.Empty) returns (google.protobuf.Empty) {}
+    rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {}
     rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 5a63ec3691..0c10fdbcb8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -348,6 +348,36 @@ public void getAndResetMetrics(com.google.protobuf.Empty request,
             }
         }
 
+        @Override
+        public void getMetrics(com.google.protobuf.Empty request,
+                                       io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
responseObserver) {
+            Runtime runtime = runtimeSpawner.getRuntime();
+            if (runtime != null) {
+                try {
+                    InstanceCommunication.MetricsData metrics = runtime.getMetrics().get();
+                    responseObserver.onNext(metrics);
+                    responseObserver.onCompleted();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Exception in JavaInstance doing getAndResetMetrics", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public void resetMetrics(com.google.protobuf.Empty request,
+                io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver)
{
+            Runtime runtime = runtimeSpawner.getRuntime();
+            if (runtime != null) {
+                try {
+                    runtime.resetMetrics().get();
+                    responseObserver.onCompleted();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Exception in JavaInstance doing getAndResetMetrics", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        
         @Override
         public void healthCheck(com.google.protobuf.Empty request,
                                 io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
responseObserver) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index bd4e3ae2d3..95d7a53f47 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -347,6 +347,50 @@ public void onSuccess(InstanceCommunication.MetricsData t) {
         return retval;
     }
 
+    @Override
+    public CompletableFuture<Void> resetMetrics() {
+        CompletableFuture<Void> retval = new CompletableFuture<>();
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<Empty> response = stub.resetMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<Empty>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(Empty t) {
+                retval.complete(null);
+            }
+        });
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<InstanceCommunication.MetricsData> response = stub.getMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>()
{
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+    
     public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck()
{
         CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>();
         if (stub == null) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index 3c71800aae..ea992901f7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -41,5 +41,9 @@
     CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus();
 
     CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
+    
+    CompletableFuture<Void> resetMetrics();
+    
+    CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
 
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 9d9654d673..3b53fb866b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -113,6 +113,18 @@ public void stop() {
     public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics()
{
         return CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics());
     }
+    
+    
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
+    }
+
+    @Override
+    public CompletableFuture<Void> resetMetrics() {
+        javaInstanceRunnable.resetMetrics();
+        return CompletableFuture.completedFuture(null);
+    }
 
     @Override
     public boolean isAlive() {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 08de636d2a..eb5afb9ed3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
@@ -42,6 +43,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -438,6 +440,22 @@ public synchronized void processAssignmentUpdate(MessageId messageId,
Assignment
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
         return this.functionRuntimeInfoMap;
     }
+    
+    public void updateRates() {
+        for (Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet())
{
+            RuntimeSpawner functionRuntimeSpawner = entry.getValue().getRuntimeSpawner();
+            if (functionRuntimeSpawner != null) {
+                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                if (functionRuntime != null) {
+                    try {
+                        functionRuntime.resetMetrics().get();
+                    } catch (Exception e) {
+                        log.error("Failed to update stats for {}-{}", entry.getKey(), e.getMessage());
+                    }
+                }
+            }
+        }
+    }
     /**
      * Private methods for internal use.  Should not be used outside of this class
      */
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 0a829cbf08..745f5d7297 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -49,7 +49,9 @@ public static void generate(WorkerService workerService, String cluster,
SimpleT
                     Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
                     if (functionRuntime != null) {
                         try {
-                            InstanceCommunication.MetricsData metrics = functionRuntime.getAndResetMetrics().get();
+                            InstanceCommunication.MetricsData metrics = workerService.getWorkerConfig()
+                                    .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get()
+                                            : functionRuntime.getAndResetMetrics().get();
                             for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest>
metricsEntry
                                     : metrics.getMetricsMap().entrySet()) {
                                 String metricName = metricsEntry.getKey();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index c7ebcf9d44..9d5dfaa187 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -73,7 +73,8 @@
     private String tlsTrustCertsFilePath = "";
     private boolean tlsAllowInsecureConnection = false;
     private boolean tlsHostnameVerificationEnable = false;
-    
+    private int metricsSamplingPeriodSec = 60;
+
     @Data
     @Setter
     @Getter
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 439333190f..29dff537f1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,7 +20,14 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -49,11 +56,14 @@
     private MembershipManager membershipManager;
     private SchedulerManager schedulerManager;
     private boolean isInitialized = false;
+    private final ScheduledExecutorService statsUpdater;
 
     private ConnectorsManager connectorsManager;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
+        this.statsUpdater = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
     }
 
     public void start(URI dlogUri) throws InterruptedException {
@@ -138,6 +148,14 @@ public void start(URI dlogUri) throws InterruptedException {
             // indicate function worker service is done intializing
             this.isInitialized = true;
 
+            this.connectorsManager = new ConnectorsManager(workerConfig);
+            
+            int metricsSamplingPeriodSec = this.workerConfig.getMetricsSamplingPeriodSec();
+            if (metricsSamplingPeriodSec > 0) {
+                this.statsUpdater.scheduleAtFixedRate(() -> this.functionRuntimeManager.updateRates(),
+                        metricsSamplingPeriodSec, metricsSamplingPeriodSec, TimeUnit.SECONDS);
+            }
+
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 7bd85e82a5..d320514ed5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -52,6 +52,7 @@ public void testFunctionsStatsGenerate() {
 
         WorkerService workerService = mock(WorkerService.class);
         doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
+        doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
 
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture
= new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
@@ -66,7 +67,7 @@ public void testFunctionsStatsGenerate() {
 
         metricsDataCompletableFuture.complete(metricsData);
         Runtime runtime = mock(Runtime.class);
-        doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics();
+        doReturn(metricsDataCompletableFuture).when(runtime).getMetrics();
 
         RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
         doReturn(runtime).when(runtimeSpawner).getRuntime();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message