pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jerrypeng closed pull request #2994: cleaning up and improving function metrics
Date Sat, 17 Nov 2018 05:28:44 GMT
jerrypeng closed pull request #2994:  cleaning up and improving function metrics
URL: https://github.com/apache/pulsar/pull/2994
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index 71567cd9eb..72ee09b4d8 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -88,11 +88,6 @@
       <destName>java-instance.jar</destName>
       <outputDirectory>instances</outputDirectory>
     </file>
-    <file>
-      <source>${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar</source>
-      <destName>PrometheusMetricsServer.jar</destName>
-      <outputDirectory>instances</outputDirectory>
-    </file>
     <file>
       <source>${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar</source>
       <destName>api-examples.jar</destName>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 61ecac40ba..6f7e43e899 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -37,6 +37,7 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -164,7 +165,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena
 
     @GET
     @ApiOperation(
-            value = "Displays the status of a Pulsar Function running in cluster mode",
+            value = "Displays the status of a Pulsar Function",
             response = FunctionStatus.class
     )
     @ApiResponses(value = {
@@ -179,6 +180,40 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
             tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
     }
 
+    @GET
+    @ApiOperation(
+            value = "Displays the stats of a Pulsar Function",
+            response = FunctionStats.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/stats")
+    public Response getFunctionStats(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the stats of a Pulsar Function instance",
+            response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
+    public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant,
+                                             final @PathParam("namespace") String namespace,
+                                             final @PathParam("functionName") String functionName,
+                                             final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionsInstanceStats(
+                tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
+    }
+
     @GET
     @ApiOperation(
             value = "Lists all Pulsar Functions currently deployed in a given namespace",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 1a9c7ad10a..a0ec3fe5bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -54,7 +54,7 @@ public WorkerService get() {
     @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
-        return worker.getWorkerMetrcis(clientAppId());
+        return worker.getWorkerMetrics(clientAppId());
     }
 
     @GET
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 270dc8d6b4..f909557b8b 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -198,6 +199,39 @@
     FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id)
             throws PulsarAdminException;
 
+    /**
+     * Gets the current stats of a function instance.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param id
+     *            Function instance-id
+     * @return
+     * @throws PulsarAdminException
+     */
+    FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id)
+            throws PulsarAdminException;
+
+    /**
+     * Gets the current stats of a function.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @return
+     * @throws PulsarAdminException
+     */
+
+    FunctionStats getFunctionStats(String tenant, String namespace, String function)
+            throws PulsarAdminException;
+
     /**
      * Restart function instance
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 20ea74bee8..753955e8de 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -22,38 +22,36 @@
 import com.google.protobuf.AbstractMessage.Builder;
 import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.StandardCopyOption;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.ClientErrorException;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.WorkerInfo;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 @Slf4j
 public class FunctionsImpl extends BaseResource implements Functions {
 
@@ -126,6 +124,33 @@ public FunctionStatus getFunctionStatus(
         }
     }
 
+    @Override
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) throws PulsarAdminException {
+        try {
+            Response response = request(
+                    functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStats getFunctionStats(String tenant, String namespace, String function) throws PulsarAdminException {
+        try {
+            Response response = request(
+                    functions.path(tenant).path(namespace).path(function).path("stats")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(FunctionStats.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }    }
+
     @Override
     public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         try {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 0524a657ab..28f9183930 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -76,6 +76,8 @@
     private final UpdateFunction updater;
     private final GetFunction getter;
     private final GetFunctionStatus functionStatus;
+    @Getter
+    private final GetFunctionStats functionStats;
     private final RestartFunction restart;
     private final StopFunction stop;
     private final ListFunctions lister;
@@ -638,6 +640,24 @@ void runCmd() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Get the current stats of a Pulsar Function")
+    class GetFunctionStats extends FunctionCommand {
+
+        @Parameter(names = "--instance-id", description = "The function instanceId (Get-status of all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            if (isBlank(instanceId)) {
+                System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName)));
+            } else {
+                System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId))));
+            }
+        }
+    }
+
     @Parameters(commandDescription = "Restart function instance")
     class RestartFunction extends FunctionCommand {
 
@@ -878,6 +898,7 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
         updater = new UpdateFunction();
         getter = new GetFunction();
         functionStatus = new GetFunctionStatus();
+        functionStats = new GetFunctionStats();
         lister = new ListFunctions();
         stateGetter = new StateGetter();
         triggerer = new TriggerFunction();
@@ -893,6 +914,7 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
         jcommander.addCommand("restart", getRestarter());
         jcommander.addCommand("stop", getStopper());
         jcommander.addCommand("getstatus", getStatuser());
+        jcommander.addCommand("stats", getFunctionStats());
         jcommander.addCommand("list", getLister());
         jcommander.addCommand("querystate", getStateGetter());
         jcommander.addCommand("trigger", getTriggerer());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
new file mode 100644
index 0000000000..99e52e53b0
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+@Data
+public class FunctionStats {
+
+    /**
+     * Overall total number of records function received from source
+     **/
+    public long receivedTotal;
+
+    /**
+     * Overall total number of records successfully processed by user function
+     **/
+    public long processedSuccessfullyTotal;
+
+    /**
+     * Overall total number of system exceptions thrown
+     **/
+    public long systemExceptionsTotal;
+
+    /**
+     * Overall total number of user exceptions thrown
+     **/
+    public long userExceptionsTotal;
+
+    /**
+     * Average process latency for function
+     **/
+    public double avgProcessLatency;
+
+    /**
+     * Timestamp of when the function was last invoked by any instance
+     **/
+    public long lastInvocation;
+
+    @Data
+    public static class FunctionInstanceStats {
+
+        /** Instance Id of function instance **/
+        public int instanceId;
+
+        @Data
+        public static class FunctionInstanceStatsData {
+
+            /**
+             * Total number of records function received from source for instance
+             **/
+            public long receivedTotal;
+
+            /**
+             * Total number of records successfully processed by user function for instance
+             **/
+            public long processedSuccessfullyTotal;
+
+            /**
+             * Total number of system exceptions thrown for instance
+             **/
+            public long systemExceptionsTotal;
+
+            /**
+             * Total number of user exceptions thrown for instance
+             **/
+            public long userExceptionsTotal;
+
+            /**
+             * Average process latency for function for instance
+             **/
+            public double avgProcessLatency;
+
+            /**
+             * Timestamp of when the function was last invoked for instance
+             **/
+            public long lastInvocation;
+
+            /**
+             * Map of user defined metrics
+             **/
+            public Map<String, Double> userMetrics = new HashMap<>();
+        }
+
+        public FunctionInstanceStatsData metrics = new FunctionInstanceStatsData();
+    }
+
+    public List<FunctionInstanceStats> instances = new LinkedList<>();
+
+    public void addInstance(FunctionInstanceStats functionInstanceStats) {
+        instances.add(functionInstanceStats);
+    }
+
+    public FunctionStats calculateOverall() {
+
+        lastInvocation = 0;
+        instances.forEach(new Consumer<FunctionInstanceStats>() {
+            @Override
+            public void accept(FunctionInstanceStats functionInstanceStats) {
+                FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = functionInstanceStats.getMetrics();
+                receivedTotal += functionInstanceStatsData.receivedTotal;
+                processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal;
+                systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
+                userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
+                avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
+                if (functionInstanceStatsData.lastInvocation > lastInvocation) {
+                    lastInvocation = functionInstanceStatsData.lastInvocation;
+                }
+
+            }
+        });
+        // calculate average from sum
+        avgProcessLatency = avgProcessLatency / instances.size();
+        return this;
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d13964df34..06048795a7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -50,11 +50,10 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX;
 
 /**
  * This class implements the Context interface exposed to the user.
@@ -67,35 +66,6 @@
     // Per Message related
     private Record<?> record;
 
-    @Getter
-    @Setter
-    private class AccumulatedMetricDatum {
-        private double count;
-        private double sum;
-        private double max;
-        private double min;
-
-        AccumulatedMetricDatum() {
-            count = 0;
-            sum = 0;
-            max = Double.MIN_VALUE;
-            min = Double.MAX_VALUE;
-        }
-
-        public void update(double value) {
-            count++;
-            sum += value;
-            if (max < value) {
-                max = value;
-            }
-            if (min > value) {
-                min = value;
-            }
-        }
-    }
-
-    private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
-
     private Map<String, Producer<?>> publishProducers;
     private ProducerBuilderImpl<?> producerBuilder;
 
@@ -118,15 +88,14 @@ public void update(double value) {
     private final static String[] userMetricsLabelNames;
     static {
         // add label to indicate user metric
-        userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames, FunctionStats.metricsLabelNames.length + 1);
-        userMetricsLabelNames[FunctionStats.metricsLabelNames.length] = "metric";
+        userMetricsLabelNames = Arrays.copyOf(FunctionStatsManager.metricsLabelNames, FunctionStatsManager.metricsLabelNames.length + 1);
+        userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] = "metric";
     }
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels) {
         this.config = config;
         this.logger = logger;
-        this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.inputTopics = inputTopics;
         this.topicSchema = new TopicSchema(client);
@@ -342,40 +311,43 @@ public ByteBuffer getState(String key) {
 
     @Override
     public void recordMetric(String metricName, double value) {
-        userMetricsLabels.computeIfAbsent(metricName,
-                s -> {
-                    String[] userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
-                    userMetricLabels[userMetricLabels.length - 1] = metricName;
-                    return userMetricLabels;
-                });
-
-        userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
-        accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
-        accumulatedMetrics.get(metricName).update(value);
+        String[] userMetricLabels = userMetricsLabels.get(metricName);
+        if (userMetricLabels == null) {
+            userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+            userMetricLabels[userMetricLabels.length - 1] = metricName;
+            // set label for metrics before putting into userMetricsLabels map to
+            // prevent race condition with getMetrics calls
+            userMetricsSummary.labels(userMetricLabels).observe(value);
+            userMetricsLabels.put(metricName, userMetricLabels);
+        } else {
+            userMetricsSummary.labels(userMetricLabels).observe(value);
+        }
     }
 
-    public MetricsData getAndResetMetrics() {
-        MetricsData retval = getMetrics();
+    public Map<String, Double> getAndResetMetrics() {
+        Map<String, Double> retval = getMetrics();
         resetMetrics();
         return retval;
     }
 
     public void resetMetrics() {
         userMetricsSummary.clear();
-        this.accumulatedMetrics.clear();
     }
 
-    public MetricsData getMetrics() {
-        MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
-        for (String metricName : accumulatedMetrics.keySet()) {
-            MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder();
-            bldr.setSum(accumulatedMetrics.get(metricName).getSum());
-            bldr.setCount(accumulatedMetrics.get(metricName).getCount());
-            bldr.setMax(accumulatedMetrics.get(metricName).getMax());
-            bldr.setMin(accumulatedMetrics.get(metricName).getMax());
-            metricsDataBuilder.putMetrics(metricName, bldr.build());
+    public Map<String, Double> getMetrics() {
+        Map<String, Double> metricsMap = new HashMap<>();
+        for (Map.Entry<String, String[]> userMetricsLabelsEntry : userMetricsLabels.entrySet()) {
+            String metricName = userMetricsLabelsEntry.getKey();
+            String[] labels = userMetricsLabelsEntry.getValue();
+            Summary.Child.Value summary = userMetricsSummary.labels(labels).get();
+            metricsMap.put(String.format("%s%s_sum", USER_METRIC_PREFIX, metricName), summary.sum);
+            metricsMap.put(String.format("%s%s_count", USER_METRIC_PREFIX, metricName), summary.count);
+            for (Map.Entry<Double, Double> entry : summary.quantiles.entrySet()) {
+                Double quantile = entry.getKey();
+                Double value = entry.getValue();
+                metricsMap.put(String.format("%s%s_%s", USER_METRIC_PREFIX, metricName, quantile), value);
+            }
         }
-        MetricsData retval = metricsDataBuilder.build();
-        return retval;
+        return metricsMap;
     }
 }
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
similarity index 79%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index c45699722e..388efb14b2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -34,18 +34,21 @@
 @Slf4j
 @Getter
 @Setter
-public class FunctionStats {
+public class FunctionStatsManager {
 
     static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster"};
 
+    public static final String PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_";
+    public final static String USER_METRIC_PREFIX = "user_metric_";
+
     /** Declare metric names **/
-    static final String PULSAR_FUNCTION_PROCESSED_TOTAL = "pulsar_function_processed_total";
-    static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL = "pulsar_function_processed_successfully_total";
-    static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL = "pulsar_function_system_exceptions_total";
-    static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL = "pulsar_function_user_exceptions_total";
-    static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS = "pulsar_function_process_latency_ms";
-    static final String PULSAR_FUNCTION_LAST_INVOCATION = "pulsar_function_last_invocation";
-    static final String PULSAR_FUNCTION_RECEIVED_TOTAL = "pulsar_function_received_total";
+    public static final String PROCESSED_TOTAL = "processed_total";
+    public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
+    public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
+    public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
+    public static final String PROCESS_LATENCY_MS = "process_latency_ms";
+    public static final String LAST_INVOCATION = "last_invocation";
+    public static final String RECEIVED_TOTAL = "received_total";
 
     /** Declare Prometheus stats **/
 
@@ -70,37 +73,37 @@
     @Getter
     private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);
 
-    public FunctionStats(CollectorRegistry collectorRegistry) {
+    public FunctionStatsManager(CollectorRegistry collectorRegistry) {
         // Declare function local collector registry so that it will not clash with other function instances'
         // metrics collection especially in threaded mode
         functionCollectorRegistry = new CollectorRegistry();
 
         statTotalProcessed = Counter.build()
-                .name(PULSAR_FUNCTION_PROCESSED_TOTAL)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
                 .help("Total number of messages processed.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
         statTotalProcessedSuccessfully = Counter.build()
-                .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
                 .help("Total number of messages processed successfully.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
         statTotalSysExceptions = Counter.build()
-                .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
         statTotalUserExceptions = Counter.build()
-                .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
                 .help("Total number of user exceptions.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
         statProcessLatency = Summary.build()
-                .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
                 .help("Process latency in milliseconds.")
                 .quantile(0.5, 0.01)
                 .quantile(0.9, 0.01)
@@ -110,13 +113,13 @@ public FunctionStats(CollectorRegistry collectorRegistry) {
                 .register(collectorRegistry);
 
         statlastInvocation = Gauge.build()
-                .name(PULSAR_FUNCTION_LAST_INVOCATION)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
                 .help("The timestamp of the last invocation of the function")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
         statTotalRecordsRecieved = Counter.build()
-                .name(PULSAR_FUNCTION_RECEIVED_TOTAL)
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
                 .help("Total number of messages received from source.")
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 39acdb57c9..acd0781161 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -26,6 +26,8 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
+import java.util.Map;
+
 /**
  * This is the Java Instance. This is started by the runtimeSpawner using the JavaInstanceClient
  * program if invoking via a process based invocation or using JavaInstance using a thread
@@ -74,7 +76,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input) {
     public void close() {
     }
 
-    public InstanceCommunication.MetricsData getAndResetMetrics() {
+    public Map<String, Double> getAndResetMetrics() {
         return context.getAndResetMetrics();
     }
 
@@ -82,7 +84,7 @@ public void resetMetrics() {
         context.resetMetrics();
     }
 
-    public InstanceCommunication.MetricsData getMetrics() {
+    public Map<String, Double> getMetrics() {
         return context.getMetrics();
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ab900ac40c..8ede5515fe 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -109,7 +109,7 @@
     private Throwable deathException;
 
     // function stats
-    private final FunctionStats stats;
+    private final FunctionStatsManager stats;
 
     private Record<?> currentRecord;
 
@@ -133,7 +133,7 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
-        this.stats = new FunctionStats(collectorRegistry);
+        this.stats = new FunctionStatsManager(collectorRegistry);
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
         this.metricsLabels = new String[]{
@@ -423,23 +423,17 @@ public void close() {
     }
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {
-        InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
+        InstanceCommunication.MetricsData metricsData = getMetrics();
         stats.reset();
-        if (javaInstance != null) {
-            InstanceCommunication.MetricsData userMetrics =  javaInstance.getAndResetMetrics();
-            if (userMetrics != null) {
-                bldr.putAllMetrics(userMetrics.getMetricsMap());
-            }
-        }
-        return bldr.build();
+        return metricsData;
     }
 
     public InstanceCommunication.MetricsData getMetrics() {
         InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
         if (javaInstance != null) {
-            InstanceCommunication.MetricsData userMetrics =  javaInstance.getMetrics();
+            Map<String, Double> userMetrics =  javaInstance.getMetrics();
             if (userMetrics != null) {
-                bldr.putAllMetrics(userMetrics.getMetricsMap());
+                bldr.putAllUserMetrics(userMetrics);
             }
         }
         return bldr.build();
@@ -452,16 +446,16 @@ public void resetMetrics() {
 
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL, stats.statTotalProcessed.labels(metricsLabels).get(), bldr);
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL, stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr);
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL,  stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr);
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL, stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr);
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL, stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr);
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS,
-                stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0
-                        ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count,
-                bldr);
-        addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION, stats.statlastInvocation.labels(metricsLabels).get(), bldr);
+
+        bldr.setProcessedTotal((long) stats.statTotalProcessed.labels(metricsLabels).get());
+        bldr.setProcessedSuccessfullyTotal((long) stats.statTotalProcessedSuccessfully.labels(metricsLabels).get());
+        bldr.setSystemExceptionsTotal((long) stats.statTotalSysExceptions.labels(metricsLabels).get());
+        bldr.setUserExceptionsTotal((long) stats.statTotalUserExceptions.labels(metricsLabels).get());
+        bldr.setReceivedTotal((long) stats.statTotalRecordsRecieved.labels(metricsLabels).get());
+        bldr.setAvgProcessLatency(stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0
+                ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count);
+        bldr.setLastInvocation((long) stats.statlastInvocation.labels(metricsLabels).get());
+
         return bldr;
     }
 
@@ -485,13 +479,6 @@ private Builder createMetricsDataBuilder() {
         return functionStatusBuilder;
     }
 
-    private static void addSystemMetrics(String metricName, double value, InstanceCommunication.MetricsData.Builder bldr) {
-        InstanceCommunication.MetricsData.DataDigest digest =
-                InstanceCommunication.MetricsData.DataDigest.newBuilder()
-                        .setCount(value).setSum(value).setMax(value).setMin(0).build();
-        bldr.putMetrics(metricName, digest);
-    }
-
     private void setupLogHandler() {
         if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
                 !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index f33f4f45d9..6c843dccd9 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd8\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12#\n\x07metrics\x18\x0f \x01(\x0b\x32\x12.proto.MetricsData\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
+  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"V\n\x12\x46unctionStatusList\x12\r\n\x05\x65rror\x18\x02 \x01(\t\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xbd\x02\n\x0bMetricsData\x12\x15\n\rreceivedTotal\x18\x02 \x01(\x03\x12\x16\n\x0eprocessedTotal\x18\x03 \x01(\x03\x12\"\n\x1aprocessedSuccessfullyTotal\x18\x04 \x01(\x03\x12\x1d\n\x15systemExceptionsTotal\x18\x05 \x01(\x03\x12\x1b\n\x13userExceptionsTotal\x18\x06 \x01(\x03\x12\x19\n\x11\x61vgProcessLatency\x18\x07 \x01(\x01\x12\x16\n\x0elastInvocation\x18\x08 \x01(\x03\x12\x38\n\x0buserMetrics\x18\t \x03(\x0b\x32#.proto.MetricsData.UserMetricsEntry\x1a\x32\n\x10UserMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -79,8 +79,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=661,
-  serialized_end=730,
+  serialized_start=624,
+  serialized_end=693,
 )
 
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
@@ -116,8 +116,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=732,
-  serialized_end=796,
+  serialized_start=695,
+  serialized_end=759,
 )
 
 _FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -226,14 +226,7 @@
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='metrics', full_name='proto.FunctionStatus.metrics', index=14,
-      number=15, type=11, cpp_type=10, label=1,
-      has_default_value=False, default_value=None,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='workerId', full_name='proto.FunctionStatus.workerId', index=15,
+      name='workerId', full_name='proto.FunctionStatus.workerId', index=14,
       number=16, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
@@ -252,7 +245,7 @@
   oneofs=[
   ],
   serialized_start=68,
-  serialized_end=796,
+  serialized_end=759,
 )
 
 
@@ -264,7 +257,14 @@
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=0,
+      name='error', full_name='proto.FunctionStatusList.error', index=0,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=1,
       number=1, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -282,109 +282,107 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=798,
-  serialized_end=869,
+  serialized_start=761,
+  serialized_end=847,
 )
 
 
-_METRICSDATA_DATADIGEST = _descriptor.Descriptor(
-  name='DataDigest',
-  full_name='proto.MetricsData.DataDigest',
+_METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
+  name='UserMetricsEntry',
+  full_name='proto.MetricsData.UserMetricsEntry',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='count', full_name='proto.MetricsData.DataDigest.count', index=0,
-      number=1, type=1, cpp_type=5, label=1,
-      has_default_value=False, default_value=float(0),
+      name='key', full_name='proto.MetricsData.UserMetricsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='sum', full_name='proto.MetricsData.DataDigest.sum', index=1,
+      name='value', full_name='proto.MetricsData.UserMetricsEntry.value', index=1,
       number=2, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='max', full_name='proto.MetricsData.DataDigest.max', index=2,
-      number=3, type=1, cpp_type=5, label=1,
-      has_default_value=False, default_value=float(0),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='min', full_name='proto.MetricsData.DataDigest.min', index=3,
-      number=4, type=1, cpp_type=5, label=1,
-      has_default_value=False, default_value=float(0),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=937,
-  serialized_end=1003,
+  serialized_start=1117,
+  serialized_end=1167,
 )
 
-_METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
-  name='MetricsEntry',
-  full_name='proto.MetricsData.MetricsEntry',
+_METRICSDATA = _descriptor.Descriptor(
+  name='MetricsData',
+  full_name='proto.MetricsData',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='proto.MetricsData.MetricsEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      name='receivedTotal', full_name='proto.MetricsData.receivedTotal', index=0,
+      number=2, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='value', full_name='proto.MetricsData.MetricsEntry.value', index=1,
-      number=2, type=11, cpp_type=10, label=1,
-      has_default_value=False, default_value=None,
+      name='processedTotal', full_name='proto.MetricsData.processedTotal', index=1,
+      number=3, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=1005,
-  serialized_end=1082,
-)
-
-_METRICSDATA = _descriptor.Descriptor(
-  name='MetricsData',
-  full_name='proto.MetricsData',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
     _descriptor.FieldDescriptor(
-      name='metrics', full_name='proto.MetricsData.metrics', index=0,
-      number=1, type=11, cpp_type=10, label=3,
+      name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
+      number=4, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=3,
+      number=5, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=4,
+      number=6, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=5,
+      number=7, type=1, cpp_type=5, label=1,
+      has_default_value=False, default_value=float(0),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=6,
+      number=8, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='userMetrics', full_name='proto.MetricsData.userMetrics', index=7,
+      number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -392,7 +390,7 @@
   ],
   extensions=[
   ],
-  nested_types=[_METRICSDATA_DATADIGEST, _METRICSDATA_METRICSENTRY, ],
+  nested_types=[_METRICSDATA_USERMETRICSENTRY, ],
   enum_types=[
   ],
   options=None,
@@ -401,8 +399,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=872,
-  serialized_end=1082,
+  serialized_start=850,
+  serialized_end=1167,
 )
 
 
@@ -432,8 +430,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1084,
-  serialized_end=1120,
+  serialized_start=1169,
+  serialized_end=1205,
 )
 
 
@@ -477,8 +475,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1183,
-  serialized_end=1275,
+  serialized_start=1268,
+  serialized_end=1360,
 )
 
 _METRICS = _descriptor.Descriptor(
@@ -507,8 +505,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1123,
-  serialized_end=1275,
+  serialized_start=1208,
+  serialized_end=1360,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
@@ -516,12 +514,9 @@
 _FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
 _FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
 _FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type = _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY
-_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA
 _FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS
-_METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
-_METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = _METRICSDATA_DATADIGEST
-_METRICSDATA_METRICSENTRY.containing_type = _METRICSDATA
-_METRICSDATA.fields_by_name['metrics'].message_type = _METRICSDATA_METRICSENTRY
+_METRICSDATA_USERMETRICSENTRY.containing_type = _METRICSDATA
+_METRICSDATA.fields_by_name['userMetrics'].message_type = _METRICSDATA_USERMETRICSENTRY
 _METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type = _METRICSDATA
 _METRICS_INSTANCEMETRICS.containing_type = _METRICS
 _METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
@@ -564,17 +559,10 @@
 
 MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), dict(
 
-  DataDigest = _reflection.GeneratedProtocolMessageType('DataDigest', (_message.Message,), dict(
-    DESCRIPTOR = _METRICSDATA_DATADIGEST,
-    __module__ = 'InstanceCommunication_pb2'
-    # @@protoc_insertion_point(class_scope:proto.MetricsData.DataDigest)
-    ))
-  ,
-
-  MetricsEntry = _reflection.GeneratedProtocolMessageType('MetricsEntry', (_message.Message,), dict(
-    DESCRIPTOR = _METRICSDATA_METRICSENTRY,
+  UserMetricsEntry = _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), dict(
+    DESCRIPTOR = _METRICSDATA_USERMETRICSENTRY,
     __module__ = 'InstanceCommunication_pb2'
-    # @@protoc_insertion_point(class_scope:proto.MetricsData.MetricsEntry)
+    # @@protoc_insertion_point(class_scope:proto.MetricsData.UserMetricsEntry)
     ))
   ,
   DESCRIPTOR = _METRICSDATA,
@@ -582,8 +570,7 @@
   # @@protoc_insertion_point(class_scope:proto.MetricsData)
   ))
 _sym_db.RegisterMessage(MetricsData)
-_sym_db.RegisterMessage(MetricsData.DataDigest)
-_sym_db.RegisterMessage(MetricsData.MetricsEntry)
+_sym_db.RegisterMessage(MetricsData.UserMetricsEntry)
 
 HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), dict(
   DESCRIPTOR = _HEALTHCHECKRESULT,
@@ -612,8 +599,8 @@
 DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.has_options = True
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_METRICSDATA_METRICSENTRY.has_options = True
-_METRICSDATA_METRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+_METRICSDATA_USERMETRICSENTRY.has_options = True
+_METRICSDATA_USERMETRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 
 _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   name='InstanceControl',
@@ -621,8 +608,8 @@
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1278,
-  serialized_end=1626,
+  serialized_start=1363,
+  serialized_end=1711,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 9a37d59ab1..381d612e48 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -29,27 +29,10 @@
 
 import pulsar
 import util
-import InstanceCommunication_pb2
 
 from prometheus_client import Summary
 from function_stats import Stats
 
-# For keeping track of accumulated metrics
-class AccumulatedMetricDatum(object):
-  def __init__(self):
-    self.count = 0.0
-    self.sum = 0.0
-    self.max = float('-inf')
-    self.min = float('inf')
-
-  def update(self, value):
-    self.count += 1
-    self.sum += value
-    if value > self.max:
-      self.max = value
-    if value < self.min:
-      self.min = value
-
 class ContextImpl(pulsar.Context):
 
   # add label to indicate user metric
@@ -62,7 +45,6 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers,
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
     self.secrets_provider = secrets_provider
-    self.accumulated_metrics = {}
     self.publish_producers = {}
     self.publish_serializers = {}
     self.current_message_id = None
@@ -132,9 +114,6 @@ def record_metric(self, metric_name, metric_value):
     if metric_name not in self.user_metrics_labels:
       self.user_metrics_labels[metric_name] = self.metrics_labels + [metric_name]
     self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
-    if not metric_name in self.accumulated_metrics:
-      self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
-    self.accumulated_metrics[metric_name].update(metric_value)
 
   def get_output_topic(self):
     return self.instance_config.function_details.output
@@ -182,13 +161,11 @@ def reset_metrics(self):
     for labels in self.user_metrics_labels.values():
       self.user_metrics_summary.labels(*labels)._sum.set(0.0)
       self.user_metrics_summary.labels(*labels)._count.set(0.0)
-    self.accumulated_metrics.clear()
 
   def get_metrics(self):
-    metrics = InstanceCommunication_pb2.MetricsData()
-    for metric_name, accumulated_metric in self.accumulated_metrics.items():
-      metrics.metrics[metric_name].count = accumulated_metric.count
-      metrics.metrics[metric_name].sum = accumulated_metric.sum
-      metrics.metrics[metric_name].max = accumulated_metric.max
-      metrics.metrics[metric_name].min = accumulated_metric.min
-    return metrics
+    metrics_map = {}
+    for metric_name, metric_labels in self.user_metrics_labels.items():
+      metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._sum.get()
+      metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._count.get()
+
+    return metrics_map
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 13b3f8442f..ff575098d7 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -26,28 +26,31 @@
 class Stats(object):
   metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 'cluster']
 
-  TOTAL_PROCESSED = 'pulsar_function_processed_total'
-  TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total'
-  TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total'
-  TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total'
-  PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms'
-  LAST_INVOCATION = 'pulsar_function_last_invocation'
-  TOTAL_RECEIVED = 'pulsar_function_received_total'
+  PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
+  USER_METRIC_PREFIX = "user_metric_";
+
+  TOTAL_PROCESSED = 'processed_total'
+  TOTAL_SUCCESSFULLY_PROCESSED = 'processed_successfully_total'
+  TOTAL_SYSTEM_EXCEPTIONS = 'system_exceptions_total'
+  TOTAL_USER_EXCEPTIONS = 'user_exceptions_total'
+  PROCESS_LATENCY_MS = 'process_latency_ms'
+  LAST_INVOCATION = 'last_invocation'
+  TOTAL_RECEIVED = 'received_total'
 
   # Declare Prometheus
-  stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
-  stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED,
+  stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names)
+  stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED,
                                               'Total number of messages processed successfully.', metrics_label_names)
-  stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
+  stat_total_sys_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX+ TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.',
                                       metrics_label_names)
-  stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.',
+  stat_total_user_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.',
                                        metrics_label_names)
 
-  stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names)
+  stat_process_latency_ms = Summary(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names)
 
-  stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last invocation of the function.', metrics_label_names)
+  stat_last_invocation = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION, 'The timestamp of the last invocation of the function.', metrics_label_names)
 
-  stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names)
+  stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names)
 
   latest_user_exception = []
   latest_sys_exception = []
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 1d56a3984c..f05e7b2b88 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -286,20 +286,34 @@ def reset_metrics(self):
     self.contextimpl.reset_metrics()
 
   def get_metrics(self):
-    # First get any user metrics
-    metrics = self.contextimpl.get_metrics()
-    # Now add system metrics as well
-    self.add_system_metrics(Stats.TOTAL_PROCESSED, Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED, Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS, Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS, Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics(Stats.PROCESS_LATENCY_MS,
-                            0.0 if Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0
-                            else Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(),
-                            metrics)
-    self.add_system_metrics(Stats.TOTAL_RECEIVED, Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics)
-    self.add_system_metrics(Stats.LAST_INVOCATION, Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics)
-    return metrics
+
+    total_received =  Stats.stat_total_received.labels(*self.metrics_labels)._value.get()
+    total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
+    total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
+    total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
+    total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
+    process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+    last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+    metrics_data = InstanceCommunication_pb2.MetricsData()
+
+    metrics_data.receivedTotal = int(total_received) if sys.version_info.major >= 3 else long(total_received)
+    metrics_data.processedTotal = int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
+    metrics_data.processedSuccessfullyTotal = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully)
+    metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions)
+    metrics_data.userExceptionsTotal = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions)
+    metrics_data.avgProcessLatency = 0.0 \
+      if process_latency_ms_count <= 0.0 \
+      else process_latency_ms_sum / process_latency_ms_count
+    metrics_data.lastInvocation = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation)
+
+    # get any user metrics
+    user_metrics = self.contextimpl.get_metrics()
+    for metric_name, value in user_metrics.items():
+      metrics_data.userMetrics[metric_name] = value
+
+    return metrics_data
 
   def add_system_metrics(self, metric_name, value, metrics):
     metrics.metrics[metric_name].count = value
@@ -312,30 +326,30 @@ def get_function_status(self):
     status.running = True
 
     total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get()
-    stat_total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
-    stat_total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
-    stat_total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
-    stat_process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
-    stat_process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
-    stat_last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
-
-    status.numProcessed =  int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
-    status.numSuccessfullyProcessed = int(stat_total_processed_successfully) if sys.version_info.major >= 3 else long(stat_total_processed_successfully)
-    status.numUserExceptions = int(stat_total_user_exceptions) if sys.version_info.major >= 3 else long(stat_total_user_exceptions)
+    total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get()
+    total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get()
+    total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get()
+    process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
+    process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+    last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+
+    status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed)
+    status.numSuccessfullyProcessed = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully)
+    status.numUserExceptions = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions)
     status.instanceId = self.instance_config.instance_id
     for ex, tm in self.stats.latest_user_exception:
       to_add = status.latestUserExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
-    status.numSystemExceptions = int(stat_total_sys_exceptions) if sys.version_info.major >= 3 else long(stat_total_sys_exceptions)
+    status.numSystemExceptions = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions)
     for ex, tm in self.stats.latest_sys_exception:
       to_add = status.latestSystemExceptions.add()
       to_add.exceptionString = ex
       to_add.msSinceEpoch = tm
     status.averageLatency = 0.0 \
-      if stat_process_latency_ms_count <= 0.0 \
-      else stat_process_latency_ms_sum / stat_process_latency_ms_count
-    status.lastInvocationTime = int(stat_last_invocation) if sys.version_info.major >= 3 else long(stat_last_invocation)
+      if process_latency_ms_count <= 0.0 \
+      else process_latency_ms_sum / process_latency_ms_count
+    status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation)
     return status
 
   def join(self):
diff --git a/pulsar-functions/metrics/pom.xml b/pulsar-functions/metrics/pom.xml
deleted file mode 100644
index ca5e0a5d55..0000000000
--- a/pulsar-functions/metrics/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-functions</artifactId>
-    <version>2.3.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-functions-metrics</artifactId>
-  <name>Pulsar Functions :: Metrics</name>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-functions-proto</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-functions-utils</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.beust</groupId>
-      <artifactId>jcommander</artifactId>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-            <configuration>
-              <finalName>
-                PrometheusMetricsServer
-              </finalName>
-              <archive>
-                <manifest>
-                  <mainClass>
-                    org.apache.pulsar.functions.sink.PrometheusMetricsServer
-                  </mainClass>
-                </manifest>
-              </archive>
-              <descriptorRefs>
-                <descriptorRef>jar-with-dependencies</descriptorRef>
-              </descriptorRefs>
-              <appendAssemblyId>false</appendAssemblyId>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/MetricsSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/MetricsSink.java
deleted file mode 100644
index d22bf6a316..0000000000
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/MetricsSink.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.metrics;
-
-import java.util.Map;
-
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
-
-/**
- * The metrics sink interface. <p>
- * Implementations of this interface consume the {@link MetricsData}
- * The calling entity pushes the {@link MetricsData} to the sink using
- * {@link #processRecord(MetricsData)} method.
- * And {@link #flush()} is called at an interval according to the configuration
- */
-public interface MetricsSink extends AutoCloseable {
-    /**
-     * Initialize the MetricsSink
-     *
-     * @param conf An unmodifiableMap containing basic configuration
-     * Attempts to modify the returned map,
-     * whether direct or via its collection views, result in an UnsupportedOperationException.
-     */
-    void init(Map<String, String> conf);
-
-    /**
-     * Process a metrics record in the sink
-     *  @param record the record to put
-     * @param functionDetails functionDetails that generated this record
-     */
-    void processRecord(MetricsData record, Function.FunctionDetails functionDetails);
-
-    /**
-     * Flush any buffered metrics
-     * It would be called at an interval according to the configuration
-     */
-    void flush();
-
-    /**
-     * Closes this stream and releases any system resources associated
-     * with it. If the stream is already closed then invoking this
-     * method has no effect.
-     */
-    void close();
-}
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
deleted file mode 100644
index ef9f0cec63..0000000000
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.metrics;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.Empty;
-import com.google.protobuf.util.JsonFormat;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.metrics.sink.AbstractWebSink;
-import org.apache.pulsar.functions.metrics.sink.PrometheusSink;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.InstanceControlGrpc;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.concurrent.*;
-
-/**
- * A function container implemented using java thread.
- */
-@Slf4j
-public class PrometheusMetricsServer {
-    @Parameter(names = "--function_details", description = "Function details json\n", required = true)
-    protected String functionDetailsJsonString;
-
-    @Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true)
-    protected int prometheusPort;
-
-    @Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true)
-    protected int grpc_port;
-
-    @Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true)
-    protected int metricsCollectionInterval;
-
-    private FunctionDetails functionDetails;
-    private MetricsSink metricsSink;
-    private ManagedChannel channel;
-    private InstanceControlGrpc.InstanceControlFutureStub stub;
-    private ScheduledExecutorService timer;
-
-    public PrometheusMetricsServer() { }
-
-
-    public void start() throws Exception {
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-        if (functionDetailsJsonString.charAt(0) == '\'') {
-            functionDetailsJsonString = functionDetailsJsonString.substring(1);
-        }
-        if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') {
-            functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
-        }
-        JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
-        functionDetails = functionDetailsBuilder.build();
-
-        metricsSink = new PrometheusSink();
-        Map<String, String> config = new HashMap<>();
-        config.put(AbstractWebSink.KEY_PATH, "/metrics");
-        config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort));
-        metricsSink.init(config);
-
-        channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port)
-                .usePlaintext(true)
-                .build();
-        stub = InstanceControlGrpc.newFutureStub(channel);
-
-        if (metricsCollectionInterval > 0) {
-            timer = Executors.newSingleThreadScheduledExecutor();
-            timer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    CompletableFuture<InstanceCommunication.MetricsData> result = getMetrics();
-                    try {
-                        metricsSink.processRecord(result.get(), functionDetails);
-                    } catch (Exception e) {
-                        log.error("Getting metrics data failed {}/{}/{}",
-                                functionDetails.getTenant(),
-                                functionDetails.getNamespace(),
-                                functionDetails.getName(),
-                                e);
-                    }
-                }
-            }, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS);
-        }
-    }
-
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
-        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
-        ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build());
-        Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
-            @Override
-            public void onFailure(Throwable throwable) {
-                retval.completeExceptionally(throwable);
-            }
-
-            @Override
-            public void onSuccess(InstanceCommunication.MetricsData t) {
-                retval.complete(t);
-            }
-        });
-        return retval;
-    }
-
-    public static void main(String[] args) throws Exception {
-        PrometheusMetricsServer server = new PrometheusMetricsServer();
-        JCommander jcommander = new JCommander(server);
-        jcommander.setProgramName("PrometheusMetricsServer");
-
-        // parse args by JCommander
-        jcommander.parse(args);
-        server.start();
-    }
-}
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/package-info.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/package-info.java
deleted file mode 100644
index 413e52d702..0000000000
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Spanwer for spawning processes, threads, docker containers to execute functions.
- */
-package org.apache.pulsar.functions.metrics;
\ No newline at end of file
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
deleted file mode 100644
index cb5541a462..0000000000
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.metrics.sink;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-import com.sun.net.httpserver.HttpServer;
-
-import org.apache.pulsar.functions.metrics.MetricsSink;
-
-
-/**
- * A metrics sink that publishes metrics on a http endpoint
- */
-abstract public class AbstractWebSink implements MetricsSink {
-    private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName());
-
-    private static final int HTTP_STATUS_OK = 200;
-
-    // Metrics will be published on http://host:port/path, the port
-    public static final String KEY_PORT = "port";
-
-    // The path
-    public static final String KEY_PATH = "path";
-
-    // Maximum number of metrics getting served
-    private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size";
-    private static final String DEFAULT_MAX_CACHE_SIZE = "1000000";
-
-    // Time To Live before a metric gets evicted from the cache
-    private static final String KEY_METRICS_CACHE_TTL_SEC = "metrics-cache-ttl-sec";
-    private static final String DEFAULT_CACHE_TTL_SECONDS = "600";
-
-    private HttpServer httpServer;
-    private long cacheMaxSize;
-    private long cacheTtlSeconds;
-    private final Ticker cacheTicker;
-
-    AbstractWebSink() {
-        this(Ticker.systemTicker());
-    }
-
-    @VisibleForTesting
-    AbstractWebSink(Ticker cacheTicker) {
-        this.cacheTicker = cacheTicker;
-    }
-
-    @Override
-    public final void init(Map<String, String> conf) {
-        String path = conf.get(KEY_PATH);
-
-        cacheMaxSize = Long.valueOf(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE,
-                DEFAULT_MAX_CACHE_SIZE));
-
-        cacheTtlSeconds = Long.valueOf(conf.getOrDefault(KEY_METRICS_CACHE_TTL_SEC,
-                DEFAULT_CACHE_TTL_SECONDS));
-
-        // initialize child classes
-        initialize(conf);
-
-        int port = Integer.valueOf(conf.getOrDefault(KEY_PORT, "9099"));
-        startHttpServer(path, port);
-    }
-
-    /**
-     * Start a http server on supplied port that will serve the metrics, as json,
-     * on the specified path.
-     *
-     * @param path
-     * @param port
-     */
-    protected void startHttpServer(String path, int port) {
-        LOG.info("Starting AbstractWebMetricSink at path" + path + " and port " + port);
-        try {
-            httpServer = HttpServer.create(new InetSocketAddress(port), 0);
-            httpServer.createContext(path, httpExchange -> {
-                byte[] response = generateResponse();
-                httpExchange.sendResponseHeaders(HTTP_STATUS_OK, response.length);
-                OutputStream os = httpExchange.getResponseBody();
-                os.write(response);
-                os.close();
-                LOG.log(Level.INFO, "Received metrics request.");
-            });
-            httpServer.start();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to create Http server on port " + port, e);
-        }
-    }
-
-    // a convenience method for creating a metrics cache
-    <K, V> Cache<K, V> createCache() {
-        return CacheBuilder.newBuilder()
-                .maximumSize(cacheMaxSize)
-                .expireAfterWrite(cacheTtlSeconds, TimeUnit.SECONDS)
-                .ticker(cacheTicker)
-                .build();
-    }
-
-    abstract byte[] generateResponse() throws IOException;
-
-    abstract void initialize(Map<String, String> configuration);
-
-    @Override
-    public void flush() { }
-
-    @Override
-    public void close() {
-        if (httpServer != null) {
-            httpServer.stop(0);
-        }
-    }
-}
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/FileSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/FileSink.java
deleted file mode 100644
index 64bc97d116..0000000000
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/FileSink.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.metrics.sink;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.metrics.MetricsSink;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
-
-/**
- * A metrics sink that writes to a file  in json format
- * We would create/overwrite a file every time the flush() in invoked
- * We would save at most fileMaximum metrics file in disk
- */
-public class FileSink implements MetricsSink {
-    private static final Logger LOG = Logger.getLogger(FileSink.class.getName());
-
-    private static final String FILENAME_KEY = "filename-output";
-    private static final String MAXIMUM_FILE_COUNT_KEY = "file-maximum";
-
-    // We would convert a file's metrics into a JSON object, i.e. array
-    // So we need to add "[" at the start and "]" at the end
-    private boolean isFileStart = true;
-    private PrintStream writer;
-    private String filenameKey;
-    private int fileMaximum = 1;
-    private int currentFileIndex = 0;
-
-    @Override
-    public void init(Map<String, String> conf) {
-        verifyConf(conf);
-        filenameKey = conf.get(FILENAME_KEY);
-        fileMaximum = Integer.valueOf(conf.get(MAXIMUM_FILE_COUNT_KEY));
-    }
-
-    private void verifyConf(Map<String, String> conf) {
-        if (!conf.containsKey(FILENAME_KEY)) {
-            throw new IllegalArgumentException("Require: " + FILENAME_KEY);
-        }
-        if (!conf.containsKey(MAXIMUM_FILE_COUNT_KEY)) {
-            throw new IllegalArgumentException("Require: " + MAXIMUM_FILE_COUNT_KEY);
-        }
-    }
-
-    private PrintStream openNewFile(String filename) {
-        // If the file already exists, set it Writable to avoid permission denied
-        File f = new File(filename);
-        if (f.exists() && !f.isDirectory()) {
-            f.setWritable(true);
-        }
-
-        try {
-            return new PrintStream(new FileOutputStream(filename, false), true, "UTF-8");
-        } catch (FileNotFoundException | UnsupportedEncodingException e) {
-            throw new RuntimeException("Error creating " + filename, e);
-        }
-    }
-
-    @Override
-    public void processRecord(InstanceCommunication.MetricsData record, Function.FunctionDetails FunctionDetails) {
-        if (isFileStart) {
-            String filenamePrefix = filenameKey + "." + FunctionDetailsUtils.getFullyQualifiedName(FunctionDetails);
-            writer = openNewFile(String.format("%s.%d", filenamePrefix, currentFileIndex));
-
-            writer.print("[");
-            isFileStart = false;
-        } else {
-            writer.print(",");
-        }
-
-        try {
-            String metrics = Utils.printJson(record);
-            writer.print(metrics);
-        } catch (Exception ex) {
-        }
-    }
-
-    @Override
-    public void flush() {
-        if (isFileStart) {
-            // No record has been processed since the previous flush, so create a new file
-            // and output an empty JSON array.
-            writer = openNewFile(String.format("%s.%d", filenameKey, currentFileIndex));
-            writer.print("[");
-        }
-        writer.print("]");
-        writer.flush();
-        writer.close();
-        new File(String.format("%s.%s", filenameKey, currentFileIndex)).setReadOnly();
-
-        currentFileIndex = (currentFileIndex + 1) % fileMaximum;
-
-        isFileStart = true;
-    }
-
-    @Override
-    public void close() {
-        if (writer != null) {
-            writer.close();
-        }
-    }
-}
\ No newline at end of file
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/PrometheusSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/PrometheusSink.java
deleted file mode 100644
index 26aa940083..0000000000
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/PrometheusSink.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.metrics.sink;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import com.google.common.cache.Cache;
-
-import lombok.Getter;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-
-/**
- * A web sink that exposes and endpoint that Prometheus can scrape
- *
- * metrics are generated in a text format and separated with a newline "\n"
- * https://prometheus.io/docs/instrumenting/exposition_formats
- *
- * metrics format:
- * heron_metric{topology="topology-name",component="component-id",instance="instance-id"} value timestamp
- */
-public class PrometheusSink extends AbstractWebSink {
-    private static final Logger LOG = Logger.getLogger(PrometheusSink.class.getName());
-
-    private static final String PREFIX = "pulsar_function";
-
-    private static final String DELIMITER = "\n";
-
-    // This is the cache that is used to serve the metrics
-    @Getter
-    private Cache<String, Map<String, Double>> metricsCache;
-
-    public PrometheusSink() {
-        super();
-    }
-
-    @Override
-    void initialize(Map<String, String> configuration) {
-        metricsCache = createCache();
-    }
-
-    @Override
-    byte[] generateResponse() throws IOException {
-        metricsCache.cleanUp();
-        final Map<String, Map<String, Double>> metrics = metricsCache.asMap();
-        final StringBuilder sb = new StringBuilder();
-
-        metrics.forEach((String source, Map<String, Double> sourceMetrics) -> {
-            String tenant = FunctionDetailsUtils.extractTenantFromFQN(source);
-            String namespace = FunctionDetailsUtils.extractNamespaceFromFQN(source);
-            String name = FunctionDetailsUtils.extractFunctionNameFromFQN(source);
-
-            sourceMetrics.forEach((String metricName, Double value) -> {
-
-                String exportedMetricName = String.format("%s_%s", PREFIX, metricName);
-                sb.append(Prometheus.sanitizeMetricName(exportedMetricName))
-                        .append("{")
-                        .append("tenant=\"").append(tenant).append("\",")
-                        .append("namespace=\"").append(namespace).append("\",")
-                        .append("functionname=\"").append(name).append("\"");
-
-                sb.append("} ")
-                        .append(Prometheus.doubleToGoString(value))
-                        .append(" ").append(currentTimeMillis())
-                        .append(DELIMITER);
-            });
-        });
-
-        return sb.toString().getBytes();
-    }
-
-    @Override
-    public void processRecord(MetricsData record, Function.FunctionDetails functionDetails) {
-        final String source = FunctionDetailsUtils.getFullyQualifiedName(functionDetails);
-
-        Map<String, Double> sourceCache = metricsCache.getIfPresent(source);
-        if (sourceCache == null) {
-            final Cache<String, Double> newSourceCache = createCache();
-            sourceCache = newSourceCache.asMap();
-        }
-
-        sourceCache.putAll(processMetrics(record));
-        metricsCache.put(source, sourceCache);
-    }
-
-    static Map<String, Double> processMetrics(MetricsData metrics) {
-        Map<String, Double> map = new HashMap<>();
-        for (Map.Entry<String, MetricsData.DataDigest> entry : metrics.getMetricsMap().entrySet()) {
-            map.put(entry.getKey() + "_count", entry.getValue().getCount());
-            map.put(entry.getKey() + "_sum", entry.getValue().getSum());
-            map.put(entry.getKey() + "_max", entry.getValue().getMax());
-            map.put(entry.getKey() + "_min", entry.getValue().getMin());
-        }
-
-        return map;
-    }
-
-    long currentTimeMillis() {
-        return System.currentTimeMillis();
-    }
-
-    // code taken from prometheus java_client repo
-    static final class Prometheus {
-        private static final Pattern METRIC_NAME_RE = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*");
-        private static final Pattern METRIC_LABEL_NAME_RE = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*");
-        private static final Pattern RESERVED_METRIC_LABEL_NAME_RE = Pattern.compile("__.*");
-
-        /**
-         * Throw an exception if the metric name is invalid.
-         */
-        static void checkMetricName(String name) {
-            if (!METRIC_NAME_RE.matcher(name).matches()) {
-                throw new IllegalArgumentException("Invalid metric name: " + name);
-            }
-        }
-
-        private static final Pattern SANITIZE_PREFIX_PATTERN = Pattern.compile("^[^a-zA-Z_]");
-        private static final Pattern SANITIZE_BODY_PATTERN = Pattern.compile("[^a-zA-Z0-9_]");
-
-        /**
-         * Sanitize metric name
-         */
-        static String sanitizeMetricName(String metricName) {
-            return SANITIZE_BODY_PATTERN.matcher(
-                    SANITIZE_PREFIX_PATTERN.matcher(metricName).replaceFirst("_")
-            ).replaceAll("_");
-        }
-
-        /**
-         * Throw an exception if the metric label name is invalid.
-         */
-        static void checkMetricLabelName(String name) {
-            if (!METRIC_LABEL_NAME_RE.matcher(name).matches()) {
-                throw new IllegalArgumentException("Invalid metric label name: " + name);
-            }
-            if (RESERVED_METRIC_LABEL_NAME_RE.matcher(name).matches()) {
-                throw new IllegalArgumentException(
-                        "Invalid metric label name, reserved for internal use: " + name);
-            }
-        }
-
-        /**
-         * Convert a double to its string representation in Go.
-         */
-        static String doubleToGoString(double d) {
-            if (d == Double.POSITIVE_INFINITY) {
-                return "+Inf";
-            }
-            if (d == Double.NEGATIVE_INFINITY) {
-                return "-Inf";
-            }
-            if (Double.isNaN(d)) {
-                return "NaN";
-            }
-            return Double.toString(d);
-        }
-
-        private Prometheus() {
-        }
-    }
-}
\ No newline at end of file
diff --git a/pulsar-functions/metrics/src/main/resources/prometheus_metricsserver_log4j2.yml b/pulsar-functions/metrics/src/main/resources/prometheus_metricsserver_log4j2.yml
deleted file mode 100644
index 3ad41cbedf..0000000000
--- a/pulsar-functions/metrics/src/main/resources/prometheus_metricsserver_log4j2.yml
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-Configuration:
-  name: pulsar-functions-kubernetes-prometheus-metrics-server
-  monitorInterval: 30
-  Appenders:
-    # Console
-    Console:
-      name: Console
-      target: SYSTEM_OUT
-      PatternLayout:
-        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
diff --git a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/FileSinkTest.java b/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/FileSinkTest.java
deleted file mode 100644
index a77b8a5837..0000000000
--- a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/FileSinkTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.metrics.sink;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.AfterMethod;
-import static org.testng.Assert.assertEquals;
-import static org.testng.AssertJUnit.fail;
-
-/**
- * FileSink Tester.
- */
-public class FileSinkTest {
-
-    private FileSink fileSink;
-    private File tmpDir;
-
-    @BeforeMethod
-    public void before() throws IOException {
-        fileSink = new FileSink();
-        Map<String, String> conf = new HashMap<>();
-        tmpDir = Files.createTempDirectory("filesink").toFile();
-        conf.put("filename-output", tmpDir.getAbsolutePath() + "/filesink");
-        conf.put("file-maximum", "100");
-        fileSink.init(conf);
-    }
-
-    @AfterMethod
-    public void after() {
-        fileSink.close();
-        for (File file: tmpDir.listFiles()) {
-            file.delete();
-        }
-        tmpDir.delete();
-    }
-
-    /**
-     * Method: flush()
-     */
-    @Test
-    public void testFirstFlushWithoutRecords() throws IOException {
-        fileSink.flush();
-        String content = new String(readFromFile(
-                new File(tmpDir, "/filesink.0").getAbsolutePath()));
-        assertEquals("[]", content);
-    }
-
-    /**
-     * Method: flush()
-     */
-    @Test
-    public void testSuccessiveFlushWithoutRecords() throws UnsupportedEncodingException, IOException {
-        fileSink.flush();
-        fileSink.flush();
-        String content = new String(readFromFile(
-                new File(tmpDir, "/filesink.0").getAbsolutePath()));
-        assertEquals("[]", content);
-        content = new String(readFromFile(new File(tmpDir, "/filesink.1").getAbsolutePath()));
-        assertEquals("[]", content);
-    }
-
-    /**
-     * Method: init()
-     */
-    @Test
-    public void testIllegalConf() {
-        FileSink sink = new FileSink();
-        Map<String, String> conf = new HashMap<>();
-        try {
-            sink.init(conf);
-            fail("Expected IllegalArgumentException.");
-        } catch (IllegalArgumentException e) {
-            assertEquals("Require: filename-output", e.getMessage());
-        }
-
-        sink = new FileSink();
-        conf.put("filename-output", tmpDir.getAbsolutePath() + "/filesink");
-        try {
-            sink.init(conf);
-            fail("Expected IllegalArgumentException.");
-        } catch (IllegalArgumentException e) {
-            assertEquals("Require: file-maximum", e.getMessage());
-        }
-    }
-
-    private String readFromFile(String path) throws IOException {
-        byte[] encoded = Files.readAllBytes(Paths.get(path));
-        return new String(encoded, StandardCharsets.UTF_8);
-    }
-}
diff --git a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/PrometheusSinkTests.java b/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/PrometheusSinkTests.java
deleted file mode 100644
index 887427f3ab..0000000000
--- a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/PrometheusSinkTests.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.metrics.sink;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
-import static org.testng.Assert.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-
-public class PrometheusSinkTests {
-
-    private static final long NOW = System.currentTimeMillis();
-
-    private final class PrometheusTestSink extends PrometheusSink {
-
-        private PrometheusTestSink() {
-        }
-
-        @Override
-        protected void startHttpServer(String path, int port) {
-            // no need to start the server for tests
-        }
-
-        public Map<String, Map<String, Double>> getMetrics() {
-            return getMetricsCache().asMap();
-        }
-
-        long currentTimeMillis() {
-            return NOW;
-        }
-    }
-
-    private Map<String, String> defaultConf;
-    private InstanceCommunication.MetricsData records;
-
-    @BeforeMethod
-    public void before() {
-
-        defaultConf = new HashMap<>();
-        defaultConf.put("port", "9999");
-        defaultConf.put("path", "test");
-        defaultConf.put("flat-metrics", "true");
-        defaultConf.put("include-topology-name", "false");
-
-        InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
-        InstanceCommunication.MetricsData.DataDigest metric1 =
-                InstanceCommunication.MetricsData.DataDigest.newBuilder()
-                        .setCount(2).setSum(5).setMax(3).setMin(2).build();
-        bldr.putMetrics("metric_1", metric1);
-        InstanceCommunication.MetricsData.DataDigest metric2 =
-                InstanceCommunication.MetricsData.DataDigest.newBuilder()
-                        .setCount(3).setSum(6).setMax(3).setMin(1).build();
-        bldr.putMetrics("metric_2", metric2);
-
-        records = bldr.build();
-    }
-
-    @Test
-    public void testMetricsGrouping() {
-        PrometheusTestSink sink = new PrometheusTestSink();
-        sink.init(defaultConf);
-        Function.FunctionDetails functionDetails = createFunctionDetails("tenant", "namespace", "functionname");
-        sink.processRecord(records, functionDetails);
-
-        final Map<String, Map<String, Double>> metrics = sink.getMetrics();
-        assertTrue(metrics.containsKey(FunctionDetailsUtils.getFullyQualifiedName(functionDetails)));
-    }
-
-    @Test
-    public void testResponse() throws IOException {
-        PrometheusTestSink sink = new PrometheusTestSink();
-        sink.init(defaultConf);
-        Function.FunctionDetails functionDetails = createFunctionDetails("tenant", "namespace", "functionname");
-        sink.processRecord(records, functionDetails);
-
-        final List<String> expectedLines = Arrays.asList(
-                createMetric(functionDetails, "metric_1_count", 2),
-                createMetric(functionDetails, "metric_1_sum", 5),
-                createMetric(functionDetails, "metric_1_max", 3),
-                createMetric(functionDetails, "metric_1_min", 2),
-                createMetric(functionDetails, "metric_2_count", 3),
-                createMetric(functionDetails, "metric_2_sum", 6),
-                createMetric(functionDetails, "metric_2_max", 3),
-                createMetric(functionDetails, "metric_2_min", 1)
-        );
-
-        final Set<String> generatedLines =
-                new HashSet<>(Arrays.asList(new String(sink.generateResponse()).split("\n")));
-
-        assertEquals(expectedLines.size(), generatedLines.size());
-
-        expectedLines.forEach((String line) -> {
-            assertTrue(generatedLines.contains(line));
-        });
-    }
-
-    private String createMetric(Function.FunctionDetails functionDetails,
-                                String metric, double value) {
-        return String.format("pulsar_function_%s"
-                        + "{tenant=\"%s\",namespace=\"%s\",functionname=\"%s\"}"
-                        + " %s %d",
-                metric, functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), value, NOW);
-    }
-
-    private Function.FunctionDetails createFunctionDetails(String tenant, String namespace, String name) {
-        Function.FunctionDetails.Builder bldr = Function.FunctionDetails.newBuilder();
-        bldr.setTenant(tenant);
-        bldr.setNamespace(namespace);
-        bldr.setName(name);
-        return bldr.build();
-    }
-}
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index a27c21b387..6de88a8399 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -37,7 +37,6 @@
     <module>api-java</module>
     <module>java-examples</module>
     <module>utils</module>
-    <module>metrics</module>
     <module>instance</module>
     <module>runtime</module>
     <module>runtime-shaded</module>
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 6625539d90..d5f5b79ec6 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -49,7 +49,7 @@ message FunctionStatus {
     // expressed in ms since epoch
     int64 lastInvocationTime = 13;
     string instanceId = 14;
-    MetricsData metrics = 15 [deprecated=true];
+//    MetricsData metrics = 15 [deprecated=true];
     // owner of function-instance
     string workerId = 16;
 }
@@ -60,13 +60,37 @@ message FunctionStatusList {
 }
 
 message MetricsData {
-    message DataDigest {
-        double count = 1;
-        double sum = 2;
-        double max = 3;
-        double min = 4;
-    }
-    map<string, DataDigest> metrics = 1;
+//    message DataDigest {
+//        double count = 1;
+//        double sum = 2;
+//        double max = 3;
+//        double min = 4;
+//    }
+//    map<string, DataDigest> metrics = 1 [deprecated=true];
+
+    // Total number of records function received from source
+    int64 receivedTotal = 2;
+
+    // Total number of records processed
+    int64 processedTotal = 3;
+
+    // Total number of records successfully processed by user function
+    int64 processedSuccessfullyTotal = 4;
+
+    // Total number of system exceptions thrown
+    int64 systemExceptionsTotal = 5;
+
+    // Total number of user exceptions thrown
+    int64 userExceptionsTotal = 6;
+
+    // Average process latency for function
+    double avgProcessLatency = 7;
+
+    // Timestamp of when the function was last invoked
+    int64 lastInvocation = 8;
+
+    // User defined metrics
+    map<string, double> userMetrics = 9;
 }
 
 message HealthCheckResult {
diff --git a/pulsar-functions/runtime-shaded/pom.xml b/pulsar-functions/runtime-shaded/pom.xml
index 3adc304117..b725bb2e11 100644
--- a/pulsar-functions/runtime-shaded/pom.xml
+++ b/pulsar-functions/runtime-shaded/pom.xml
@@ -133,7 +133,6 @@
                   <!-- dependencies use protobuf -->
                   <include>org.apache.pulsar:pulsar-functions-proto</include>
                   <include>org.apache.pulsar:pulsar-functions-utils</include>
-                  <include>org.apache.pulsar:pulsar-functions-metrics</include>
                   <include>org.apache.pulsar:pulsar-functions-instance</include>
                   <include>org.apache.pulsar:pulsar-functions-runtime</include>
                   <include>org.apache.pulsar:pulsar-functions-api</include>
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index e0b264d78c..f58999c4b2 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -39,12 +39,6 @@
       <version>${project.version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-functions-metrics</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-functions-secrets</artifactId>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 4dc97ace14..482943be2c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -24,7 +24,6 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.protobuf.Empty;
-import com.google.protobuf.util.JsonFormat;
 import com.squareup.okhttp.Response;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -59,7 +58,6 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -135,7 +133,6 @@
                       InstanceConfig instanceConfig,
                       String instanceFile,
                       String extraDependenciesDir,
-                      String prometheusMetricsServerJarFile,
                       String logDirectory,
                       String userCodePkgUrl,
                       String originalCodeFileName,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 3e06c0367c..bfa0eb38d1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -76,7 +76,6 @@
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
     private final String extraDependenciesDir;
-    private final String prometheusMetricsServerJarFile;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
     private final String logDirectory = "logs/functions";
     private Timer changeConfigMapTimer;
@@ -143,7 +142,6 @@ public KubernetesRuntimeFactory(String k8Uri,
         this.authConfig = authConfig;
         this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
         this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
-        this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar";
         this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
     }
@@ -182,7 +180,6 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
             instanceConfig,
             instanceFile,
             extraDependenciesDir,
-            prometheusMetricsServerJarFile,
             logDirectory,
             codePkgUrl,
             originalCodeFileName,
diff --git a/pulsar-functions/worker-shaded/pom.xml b/pulsar-functions/worker-shaded/pom.xml
index 8c73a930e5..eedb3a6590 100644
--- a/pulsar-functions/worker-shaded/pom.xml
+++ b/pulsar-functions/worker-shaded/pom.xml
@@ -121,7 +121,6 @@
                   <!-- dependencies use protobuf -->
                   <include>org.apache.pulsar:pulsar-functions-proto</include>
                   <include>org.apache.pulsar:pulsar-functions-utils</include>
-                  <include>org.apache.pulsar:pulsar-functions-metrics</include>
                   <include>org.apache.pulsar:pulsar-functions-instance</include>
                   <include>org.apache.pulsar:pulsar-functions-runtime</include>
                   <include>org.apache.pulsar:pulsar-functions-worker</include>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 8a8fafa1a5..add4013003 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,7 +36,6 @@
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
-import io.prometheus.client.CollectorRegistry;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -46,6 +45,7 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -55,10 +55,10 @@
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 
 /**
@@ -301,89 +301,6 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) {
         }
     }
 
-    /**
-     * Get status of a function instance.  If this worker is not running the function instance,
-     * @param tenant the tenant the function belongs to
-     * @param namespace the namespace the function belongs to
-     * @param functionName the function name
-     * @param instanceId the function instance id
-     * @return the function status
-     */
-    public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
-            String functionName, int instanceId, URI uri) {
-        Assignment assignment;
-        if (runtimeFactory.externallyManaged()) {
-            assignment = this.findAssignment(tenant, namespace, functionName, -1);
-        } else {
-            assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
-        }
-        final String assignedWorkerId = assignment.getWorkerId();
-        final String workerId = this.workerConfig.getWorkerId();
-        
-        if (assignment == null) {
-            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
-                    = InstanceCommunication.FunctionStatus.newBuilder();
-            functionStatusBuilder.setRunning(false);
-            functionStatusBuilder.setFailureException("Function has not been scheduled");
-            return functionStatusBuilder.build();
-        }
-
-        InstanceCommunication.FunctionStatus functionStatus = null;
-        // If I am running worker
-        if (assignedWorkerId.equals(workerId)) {
-            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
-                    Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
-            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
-            if (runtimeSpawner != null) {
-                try {
-                    InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus
-                            .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
-                    functionStatusBuilder.setWorkerId(assignedWorkerId);
-                    functionStatus = functionStatusBuilder.build();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            } else {
-                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
-                        = InstanceCommunication.FunctionStatus.newBuilder();
-                functionStatusBuilder.setRunning(false);
-                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
-                if (functionRuntimeInfo.getStartupException() != null) {
-                    functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
-                }
-                functionStatusBuilder.setWorkerId(assignedWorkerId);
-                functionStatus = functionStatusBuilder.build();
-            }
-        } else {
-            // query other worker
-
-            List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
-            WorkerInfo workerInfo = null;
-            for (WorkerInfo entry: workerInfoList) {
-                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
-                    workerInfo = entry;
-                }
-            }
-            if (workerInfo == null) {
-                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
-                        = InstanceCommunication.FunctionStatus.newBuilder();
-                functionStatusBuilder.setRunning(false);
-                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
-                functionStatusBuilder.setFailureException("Function has not been scheduled");
-                return functionStatusBuilder.build();
-            }
-
-            if (uri == null) {
-                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
-            } else {
-                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
-            }
-        }
-
-        return functionStatus;
-    }
-
     public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
             boolean restart, URI uri) throws Exception {
         if (runtimeFactory.externallyManaged()) {
@@ -537,6 +454,223 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
         }
     }
 
+    /**
+     * Get stats of a function instance.  If this worker is not running the function instance,
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param functionName the function name
+     * @param instanceId the function instance id
+     * @return jsonObject containing stats for instance
+     */
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(String tenant, String namespace,
+                                                                        String functionName, int instanceId, URI uri) {
+        Assignment assignment;
+        if (runtimeFactory.externallyManaged()) {
+            assignment = this.findAssignment(tenant, namespace, functionName, -1);
+        } else {
+            assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
+        }
+
+        if (assignment == null) {
+            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
+        }
+
+        final String assignedWorkerId = assignment.getWorkerId();
+        final String workerId = this.workerConfig.getWorkerId();
+
+        // If I am running worker
+        if (assignedWorkerId.equals(workerId)) {
+            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
+                    Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
+            if (runtimeSpawner != null) {
+                return Utils.getFunctionInstanceStats(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics();
+            }
+            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
+        } else {
+            // query other worker
+
+            List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+            WorkerInfo workerInfo = null;
+            for (WorkerInfo entry: workerInfoList) {
+                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                    workerInfo = entry;
+                }
+            }
+            if (workerInfo == null) {
+                return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
+            }
+
+            if (uri == null) {
+                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+            } else {
+                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            }
+        }
+    }
+
+    /**
+     * Get stats of all function instances.
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param functionName the function name
+     * @return a list of function statuses
+     * @throws PulsarAdminException
+     */
+    public FunctionStats getFunctionStats(String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
+        Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
+
+        FunctionStats functionStats = new FunctionStats();
+        if (assignments.isEmpty()) {
+            return functionStats;
+        }
+
+        if (runtimeFactory.externallyManaged()) {
+            Assignment assignment = assignments.iterator().next();
+            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+            if (isOwner) {
+                int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
+                for (int i = 0; i < parallelism; ++i) {
+
+                    FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace,
+                            functionName, i, null);
+
+                    FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
+                    functionInstanceStats.setInstanceId(i);
+                    functionInstanceStats.setMetrics(functionInstanceStatsData);
+                    functionStats.addInstance(functionInstanceStats);
+                }
+            } else {
+                // find the hostname/port of the worker who is the owner
+
+                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry: workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    return functionStats;
+                }
+
+                if (uri == null) {
+                    throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                } else {
+                    URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
+            }
+        } else {
+            for (Assignment assignment : assignments) {
+                boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+
+                FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData;
+                if (isOwner) {
+                    functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace, functionName,
+                            assignment.getInstance().getInstanceId(), null);
+                } else {
+                    functionInstanceStatsData = this.functionAdmin.functions().getFunctionStats(
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
+                }
+
+                FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
+                functionInstanceStats.setInstanceId(assignment.getInstance().getInstanceId());
+                functionInstanceStats.setMetrics(functionInstanceStatsData);
+                functionStats.addInstance(functionInstanceStats);
+            }
+        }
+        return functionStats.calculateOverall();
+    }
+
+    /**
+     * Get status of a function instance.  If this worker is not running the function instance,
+     * @param tenant the tenant the function belongs to
+     * @param namespace the namespace the function belongs to
+     * @param functionName the function name
+     * @param instanceId the function instance id
+     * @return the function status
+     */
+    public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
+                                                                          String functionName, int instanceId, URI uri) {
+        Assignment assignment;
+        if (runtimeFactory.externallyManaged()) {
+            assignment = this.findAssignment(tenant, namespace, functionName, -1);
+        } else {
+            assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
+        }
+
+        if (assignment == null) {
+            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
+                    = InstanceCommunication.FunctionStatus.newBuilder();
+            functionStatusBuilder.setRunning(false);
+            functionStatusBuilder.setFailureException("Function has not been scheduled");
+            return functionStatusBuilder.build();
+        }
+
+        final String assignedWorkerId = assignment.getWorkerId();
+        final String workerId = this.workerConfig.getWorkerId();
+
+        InstanceCommunication.FunctionStatus functionStatus = null;
+        // If I am running worker
+        if (assignedWorkerId.equals(workerId)) {
+            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
+                    Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
+            if (runtimeSpawner != null) {
+                try {
+                    InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus
+                            .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
+                    functionStatusBuilder.setWorkerId(assignedWorkerId);
+                    functionStatus = functionStatusBuilder.build();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
+                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
+                        = InstanceCommunication.FunctionStatus.newBuilder();
+                functionStatusBuilder.setRunning(false);
+                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
+                if (functionRuntimeInfo.getStartupException() != null) {
+                    functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
+                }
+                functionStatusBuilder.setWorkerId(assignedWorkerId);
+                functionStatus = functionStatusBuilder.build();
+            }
+        } else {
+            // query other worker
+
+            List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+            WorkerInfo workerInfo = null;
+            for (WorkerInfo entry: workerInfoList) {
+                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                    workerInfo = entry;
+                }
+            }
+            if (workerInfo == null) {
+                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
+                        = InstanceCommunication.FunctionStatus.newBuilder();
+                functionStatusBuilder.setRunning(false);
+                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
+                functionStatusBuilder.setFailureException("Function has not been scheduled");
+                return functionStatusBuilder.build();
+            }
+
+            if (uri == null) {
+                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+            } else {
+                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            }
+        }
+
+        return functionStatus;
+    }
+
     /**
      * Get statuses of all function instances.
      * @param tenant the tenant the function belongs to
@@ -545,8 +679,9 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
      * @return a list of function statuses
      * @throws PulsarAdminException 
      */
-    public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
-            String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
+    public InstanceCommunication.FunctionStatusList getAllFunctionStatus(String tenant, String namespace,
+                                                                         String functionName, URI uri)
+            throws PulsarAdminException {
 
         Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
 
@@ -555,6 +690,7 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
             return functionStatusListBuilder.build();
         }
 
+        // TODO refactor the code for externally managed.
         if (runtimeFactory.externallyManaged()) {
             Assignment assignment = assignments.iterator().next();
             boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index d2f1504510..77228ac224 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -18,17 +18,16 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import org.apache.pulsar.functions.instance.FunctionStatsManager;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -59,30 +58,30 @@ public static void generate(WorkerService workerService, String cluster, SimpleT
                     if (functionRuntime != null) {
                         try {
                             InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get();
-                            for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry
-                                    : metrics.getMetricsMap().entrySet()) {
-                                String metricName = metricsEntry.getKey();
-                                InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue();
 
-                                String tenant = functionRuntimeInfo.getFunctionInstance()
-                                        .getFunctionMetaData().getFunctionDetails().getTenant();
-                                String namespace = functionRuntimeInfo.getFunctionInstance()
-                                        .getFunctionMetaData().getFunctionDetails().getNamespace();
-                                String name = functionRuntimeInfo.getFunctionInstance()
-                                        .getFunctionMetaData().getFunctionDetails().getName();
-                                int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
-                                String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
+                            String tenant = functionRuntimeInfo.getFunctionInstance()
+                                    .getFunctionMetaData().getFunctionDetails().getTenant();
+                            String namespace = functionRuntimeInfo.getFunctionInstance()
+                                    .getFunctionMetaData().getFunctionDetails().getNamespace();
+                            String name = functionRuntimeInfo.getFunctionInstance()
+                                    .getFunctionMetaData().getFunctionDetails().getName();
+                            int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                            String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
 
-                                metric(out, cluster, qualifiedNamespace, name, String.format("%s_count", metricName),
-                                        instanceId, dataDigest.getCount());
-                                metric(out, cluster, qualifiedNamespace, name, String.format("%s_max", metricName),
-                                        instanceId, dataDigest.getMax());
-                                metric(out, cluster, qualifiedNamespace,name, String.format("%s_min", metricName),
-                                        instanceId, dataDigest.getMin());
-                                metric(out, cluster, qualifiedNamespace, name, String.format("%s_sum", metricName),
-                                        instanceId, dataDigest.getSum());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, metrics.getAvgProcessLatency());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, metrics.getProcessedSuccessfullyTotal());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_TOTAL, instanceId, metrics.getProcessedTotal());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, metrics.getSystemExceptionsTotal());
+                            metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, metrics.getUserExceptionsTotal());
 
+                            for (Map.Entry<String, Double> userMetricsMapEntry : metrics.getUserMetricsMap().entrySet()) {
+                                String userMetricName = userMetricsMapEntry.getKey();
+                                Double val = userMetricsMapEntry.getValue();
+                                metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, instanceId, val);
                             }
+
                         } catch (InterruptedException | ExecutionException e) {
                             log.warn("Failed to collect metrics for function instance {}",
                                     fullyQualifiedInstanceName, e);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 41f04af445..2cd51b84ea 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -23,7 +23,12 @@
 import java.net.URL;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -37,6 +42,11 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 import org.apache.zookeeper.KeeperException.Code;
@@ -194,5 +204,53 @@ public static String getFullyQualifiedInstanceId(String tenant, String namespace
                                                      String functionName, int instanceId) {
         return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId);
     }
-    
+
+    public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) {
+        RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
+
+        FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
+        if (functionRuntimeSpawner != null) {
+            Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+            if (functionRuntime != null) {
+                try {
+
+                    InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
+                    int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                    functionInstanceStats.setInstanceId(instanceId);
+
+                    FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
+                            = new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
+
+                    functionInstanceStatsData.setReceivedTotal(metricsData.getReceivedTotal());
+                    functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
+                    functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
+                    functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
+                    functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
+                    functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
+
+                    // Filter out values that are NaN
+                    Map<String, Double> statsDataMap = metricsData.getUserMetricsMap().entrySet().stream()
+                            .filter(stringDoubleEntry -> !stringDoubleEntry.getValue().isNaN())
+                            .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+
+                    functionInstanceStatsData.setUserMetrics(statsDataMap);
+
+                    functionInstanceStats.setMetrics(functionInstanceStatsData);
+                } catch (InterruptedException | ExecutionException e) {
+                    log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e);
+                }
+            }
+        }
+        return functionInstanceStats;
+    }
+
+    public static FunctionStats getFunctionStats(Map<String, FunctionRuntimeInfo> functionRuntimes) {
+        FunctionStats functionStats = new FunctionStats();
+        for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
+            String fullyQualifiedInstanceName = entry.getKey();
+            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+            functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo));
+        }
+        return functionStats;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index af8c050b2a..18de1eb734 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -26,6 +26,7 @@
 
 import com.google.gson.Gson;
 
+import com.google.gson.GsonBuilder;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -84,6 +85,7 @@
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -521,8 +523,18 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin
         return Response.status(Status.OK).entity(retval).build();
     }
 
-    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
-            final String componentType, final String instanceId, URI uri) throws IOException {
+    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri);
+    }
+
+    public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri);
+    }
+
+    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, boolean restart, URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -532,58 +544,49 @@ public Response getFunctionInstanceStatus(final String tenant, final String name
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
+
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
-        int instanceIdInt = Integer.parseInt(instanceId);
-        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
-            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
-        }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
-        FunctionStatus functionStatus = null;
         try {
-            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName,
-                    Integer.parseInt(instanceId), uri);
+            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
+                    Integer.parseInt(instanceId), restart, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
+            log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
-
-        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatus);
-        return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
-            final String componentType, final String instanceId, URI uri) {
-        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri);
+    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+                                          final String componentType) {
+        return stopFunctionInstances(tenant, namespace, componentName, componentType, false);
     }
 
-    public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
-            final String componentType, final String instanceId, URI uri) {
-        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri);
+    public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName,
+                                             final String componentType) {
+        return stopFunctionInstances(tenant, namespace, componentName, componentType, true);
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
-            final String componentType, final String instanceId, boolean restart, URI uri) {
+    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+            final String componentType, boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -591,7 +594,7 @@ public Response stopFunctionInstance(final String tenant, final String namespace
 
         // validate parameters
         try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
             log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -600,7 +603,7 @@ public Response stopFunctionInstance(final String tenant, final String namespace
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
@@ -614,28 +617,17 @@ public Response stopFunctionInstance(final String tenant, final String namespace
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
-                    Integer.parseInt(instanceId), restart, uri);
+            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
+            log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
-                                          final String componentType) {
-        return stopFunctionInstances(tenant, namespace, componentName, componentType, false);
-    }
-
-    public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName,
-                                             final String componentType) {
-        return stopFunctionInstances(tenant, namespace, componentName, componentType, true);
-    }
-
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
-            final String componentType, boolean restart) {
+    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
+                                              final String componentType, final String instanceId, URI uri) throws IOException {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -643,36 +635,46 @@ public Response stopFunctionInstances(final String tenant, final String namespac
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
+            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
-
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
+        int instanceIdInt = Integer.parseInt(instanceId);
+        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
+            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
+        }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
+        FunctionStatus functionStatus = null;
         try {
-            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
+            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName,
+                    Integer.parseInt(instanceId), uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build();
         }
+
+        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatus);
+        return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
     public Response getFunctionStatus(final String tenant, final String namespace, final String componentName,
@@ -713,18 +715,109 @@ public Response getFunctionStatus(final String tenant, final String namespace, f
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Got Exception Getting Status", e);
-            FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
-            functionStatusBuilder.setRunning(false);
-            String functionDetailsJson = org.apache.pulsar.functions.utils.Utils
-                    .printJson(functionStatusBuilder.build());
-            return Response.status(Status.OK).entity(functionDetailsJson).build();
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build();
         }
 
         String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList);
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
+    private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+    public Response getFunctionStats(final String tenant, final String namespace, final String componentName,
+                                     final String componentType, URI uri) throws IOException {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
+        FunctionStats functionStats;
+        try {
+            functionStats = functionRuntimeManager.getFunctionStats(tenant, namespace, componentName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build();
+        }
+
+        return Response.status(Status.OK).entity(gson.toJson(functionStats)).build();
+
+    }
+
+    public Response getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName,
+                                      final String componentType, String instanceId, URI uri) {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+        }
+        int instanceIdInt = Integer.parseInt(instanceId);
+        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
+            log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
+        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData;
+        try {
+            functionInstanceStatsData = functionRuntimeManager.getFunctionInstanceStats(tenant, namespace, componentName,
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("{}/{}/{} Got Exception Getting Stats", tenant, namespace, componentName, e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build();
+        }
+
+        return Response.status(Status.OK).entity(gson.toJson(functionInstanceStatsData)).build();
+    }
+
     public Response listFunctions(final String tenant, final String namespace, String componentType) {
 
         if (!isWorkerServiceAvailable()) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 88c0a17f0a..bafb7a9475 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -19,15 +19,14 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.worker.*;
 
 import javax.ws.rs.WebApplicationException;
@@ -36,7 +35,6 @@
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -46,6 +44,8 @@
 
     private final Supplier<WorkerService> workerServiceSupplier;
 
+    private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
     public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
         this.workerServiceSupplier = workerServiceSupplier;
     }
@@ -122,16 +122,16 @@ public boolean isSuperUser(String clientRole) {
         return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
-    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis(String clientRole) throws IOException {
+    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(String clientRole) throws IOException {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
             throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(clientRole + " is not authorize to get metrics")).build());
         }
-        return getWorkerMetrcis();
+        return getWorkerMetrics();
     }
 
-    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
+    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
         if (!isWorkerServiceAvailable()) {
             throw new WebApplicationException(
                     Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
@@ -149,6 +149,13 @@ public Response getFunctionsMetrics(String clientRole) throws IOException {
         return getFunctionsMetrics();
     }
 
+    @Data
+    public static class WorkerFunctionInstanceStats {
+        /** fully qualified function instance name **/
+        public String name;
+        public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
+    }
+
     private Response getFunctionsMetrics() throws IOException {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -158,41 +165,22 @@ private Response getFunctionsMetrics() throws IOException {
         Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager()
                 .getFunctionRuntimeInfos();
 
-        Metrics.Builder metricsBuilder = Metrics.newBuilder();
+        JsonArray metricsMapList = new JsonArray();
+
         for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
-            RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
-
-            if (functionRuntimeSpawner != null) {
-                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
-                if (functionRuntime != null) {
-                    try {
-                        InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
-
-                        String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getTenant();
-                        String namespace = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getNamespace();
-                        String name = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getName();
-                        int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
-                        String qualifiedFunctionName = String.format("%s/%s/%s", tenant, namespace, name);
-
-                        InstanceMetrics.Builder instanceBuilder = InstanceMetrics.newBuilder();
-                        instanceBuilder.setName(qualifiedFunctionName);
-                        instanceBuilder.setInstanceId(instanceId);
-                        if (metricsData != null) {
-                            instanceBuilder.setMetricsData(metricsData);
-                        }
-                        metricsBuilder.addMetrics(instanceBuilder.build());
-                    } catch (InterruptedException | ExecutionException e) {
-                        log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e);
-                    }
-                }
-            }
+
+            FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
+
+            WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+            workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
+            workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+
+            metricsMapList.add(gson.toJsonTree(workerFunctionInstanceStats));
         }
-        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+        String jsonResponse = gson.toJson(metricsMapList);
+
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index e37a88a38a..3412d2e96a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -120,12 +122,37 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
     }
 
     @GET
-    @Path("/{tenant}/{namespace}")
-    public Response listFunctions(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(
-            tenant, namespace, FunctionsImpl.FUNCTION);
+    @ApiOperation(
+            value = "Displays the stats of a Pulsar Function",
+            response = FunctionStats.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/stats")
+    public Response getFunctionStats(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
+    }
 
+    @GET
+    @ApiOperation(
+            value = "Displays the stats of a Pulsar Function instance",
+            response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
+    public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant,
+                                             final @PathParam("namespace") String namespace,
+                                             final @PathParam("functionName") String functionName,
+                                             final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionsInstanceStats(
+                tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @POST
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
index c802da7c6f..022ebd3126 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
@@ -80,12 +80,12 @@ public String clientAppId() {
     @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
-        return worker.getWorkerMetrcis(clientAppId());
+        return worker.getWorkerMetrics(clientAppId());
     }
 
     @GET
     @Path("/functionsmetrics")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
+    @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = WorkerImpl.WorkerFunctionInstanceStats.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 503, message = "Worker service is not running") })
     public Response getFunctionsMetrics() throws IOException {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 490be77732..20e4897559 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -28,9 +28,7 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -41,7 +39,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
@@ -59,29 +56,6 @@
 @Slf4j
 public class FunctionRuntimeManagerTest {
 
-    public static class TestSink implements MetricsSink {
-
-        @Override
-        public void init(Map<String, String> conf) {
-
-        }
-
-        @Override
-        public void processRecord(InstanceCommunication.MetricsData record, Function.FunctionDetails functionDetails) {
-
-        }
-
-        @Override
-        public void flush() {
-
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-
     @Test
     public void testProcessAssignmentUpdateAddFunctions() throws Exception {
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index c957de7e4b..21a2852d99 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -86,13 +86,13 @@ public void testFunctionsStatsGenerate() {
 
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
-                .putMetrics(
-                        "pulsar_function_processed_total",
-                        InstanceCommunication.MetricsData.DataDigest.newBuilder()
-                                .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
-                .putMetrics("pulsar_function_process_latency_ms",
-                        InstanceCommunication.MetricsData.DataDigest.newBuilder()
-                                .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
+                .setReceivedTotal(101)
+                .setProcessedTotal(100)
+                .setProcessedSuccessfullyTotal(99)
+                .setAvgProcessLatency(10.0)
+                .setUserExceptionsTotal(3)
+                .setSystemExceptionsTotal(1)
+                .setLastInvocation(1542324900)
                 .build();
 
         metricsDataCompletableFuture.complete(metricsData);
@@ -121,67 +121,61 @@ public void testFunctionsStatsGenerate() {
         FunctionsStatsGenerator.generate(workerService, "default", statsOut);
 
         String str = buf.toString(Charset.defaultCharset());
+
         buf.release();
         Map<String, Metric> metrics = parseMetrics(str);
 
-        Assert.assertEquals(metrics.size(), 8);
+        Assert.assertEquals(metrics.size(), 7);
 
         System.out.println("metrics: " + metrics);
-        Metric m = metrics.get("pulsar_function_processed_total_count");
-        assertEquals(m.tags.get("cluster"), "default");
-        assertEquals(m.tags.get("instanceId"), "0");
-        assertEquals(m.tags.get("name"), "func-1");
-        assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 100.0);
-
-        m = metrics.get("pulsar_function_processed_total_max");
+        Metric m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 200.0);
+        assertEquals(m.value, 101.0);
 
-        m = metrics.get("pulsar_function_processed_total_sum");
+        m = metrics.get("pulsar_function_processed_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 300.0);
+        assertEquals(m.value, 100.0);
 
-        m = metrics.get("pulsar_function_processed_total_min");
+        m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 0.0);
+        assertEquals(m.value, 3.0);
 
-        m = metrics.get("pulsar_function_process_latency_ms_count");
+        m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
         assertEquals(m.value, 10.0);
 
-        m = metrics.get("pulsar_function_process_latency_ms_max");
+        m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 20.0);
+        assertEquals(m.value, 1.0);
 
-        m = metrics.get("pulsar_function_process_latency_ms_sum");
+        m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 30.0);
+        assertEquals(m.value, 1542324900.0);
 
-        m = metrics.get("pulsar_function_process_latency_ms_min");
+        m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), "default");
         assertEquals(m.tags.get("instanceId"), "0");
         assertEquals(m.tags.get("name"), "func-1");
         assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
-        assertEquals(m.value, 0.0);
+        assertEquals(m.value, 99.0);
     }
 
     /**
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
index 4f49eef66e..70bc75764c 100644
--- a/pulsar-spark/pom.xml
+++ b/pulsar-spark/pom.xml
@@ -34,34 +34,6 @@
   <name>Spark Streaming Pulsar Receivers</name>
 
   <dependencies>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>managed-ledger-original</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
 
     <dependency>
       <groupId>${project.groupId}</groupId>
@@ -97,7 +69,6 @@
             </goals>
             <configuration>
               <createDependencyReducedPom>true</createDependencyReducedPom>
-              <promoteTransitiveDependencies>ture</promoteTransitiveDependencies>
               <artifactSet>
                 <includes>
                   <include>com.google.guava:guava</include>
diff --git a/tests/pom.xml b/tests/pom.xml
index 6c4b094d4c..e6f6ba7e67 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -36,6 +36,7 @@
     <module>integration</module>
     <module>pulsar-kafka-compat-client-test</module>
     <module>pulsar-storm-test</module>
+    <module>pulsar-spark-test</module>
   </modules>
   <build>
     <plugins>
diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml
new file mode 100644
index 0000000000..fae0bda937
--- /dev/null
+++ b/tests/pulsar-spark-test/pom.xml
@@ -0,0 +1,81 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.pulsar.tests</groupId>
+        <artifactId>tests-parent</artifactId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-spark-test</artifactId>
+    <packaging>jar</packaging>
+    <name>Spark Streaming Pulsar Receivers Tests</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-spark</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-broker</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-broker</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>managed-ledger-original</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/pulsar-spark/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
similarity index 100%
rename from pulsar-spark/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
rename to tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
diff --git a/pulsar-spark/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
similarity index 100%
rename from pulsar-spark/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
rename to tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message