From commits-return-17642-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Sat Nov 17 06:28:54 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9A4DD180658 for ; Sat, 17 Nov 2018 06:28:52 +0100 (CET) Received: (qmail 21668 invoked by uid 500); 17 Nov 2018 05:28:46 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 20929 invoked by uid 99); 17 Nov 2018 05:28:46 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Nov 2018 05:28:46 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] jerrypeng closed pull request #2994: cleaning up and improving function metrics Message-ID: <154243252485.21428.18410654438499164305.gitbox@gitbox.apache.org> Date: Sat, 17 Nov 2018 05:28:44 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit jerrypeng closed pull request #2994: cleaning up and improving function metrics URL: https://github.com/apache/pulsar/pull/2994 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml index 71567cd9eb..72ee09b4d8 100644 --- a/distribution/server/src/assemble/bin.xml +++ b/distribution/server/src/assemble/bin.xml @@ -88,11 +88,6 @@ java-instance.jar instances - - ${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar - PrometheusMetricsServer.jar - instances - ${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar api-examples.jar diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 61ecac40ba..6f7e43e899 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.worker.WorkerService; @@ -164,7 +165,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena @GET @ApiOperation( - value = "Displays the status of a Pulsar Function running in cluster mode", + value = "Displays the status of a Pulsar Function", response = FunctionStatus.class ) @ApiResponses(value = { @@ -179,6 +180,40 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); } + @GET + @ApiOperation( + value = "Displays the stats of a Pulsar Function", + response = FunctionStats.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{functionName}/stats") + public Response getFunctionStats(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) throws IOException { + return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); + } + + @GET + @ApiOperation( + value = "Displays the stats of a Pulsar Function instance", + response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats") + public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) throws IOException { + return functions.getFunctionsInstanceStats( + tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); + } + @GET @ApiOperation( value = "Lists all Pulsar Functions currently deployed in a given namespace", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java index 1a9c7ad10a..a0ec3fe5bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -54,7 +54,7 @@ public WorkerService get() { @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) public Collection getMetrics() throws Exception { - return worker.getWorkerMetrcis(clientAppId()); + return worker.getWorkerMetrics(clientAppId()); } @GET diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 270dc8d6b4..f909557b8b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.common.functions.FunctionConfig; @@ -198,6 +199,39 @@ FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id) throws PulsarAdminException; + /** + * Gets the current stats of a function instance. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @param id + * Function instance-id + * @return + * @throws PulsarAdminException + */ + FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) + throws PulsarAdminException; + + /** + * Gets the current stats of a function. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @return + * @throws PulsarAdminException + */ + + FunctionStats getFunctionStats(String tenant, String namespace, String function) + throws PulsarAdminException; + /** * Restart function instance * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 20ea74bee8..753955e8de 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -22,38 +22,36 @@ import com.google.protobuf.AbstractMessage.Builder; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.JsonFormat; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.StandardCopyOption; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import javax.ws.rs.ClientErrorException; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.GenericType; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - import lombok.extern.slf4j.Slf4j; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; -import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.common.functions.WorkerInfo; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + @Slf4j public class FunctionsImpl extends BaseResource implements Functions { @@ -126,6 +124,33 @@ public FunctionStatus getFunctionStatus( } } + @Override + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) throws PulsarAdminException { + try { + Response response = request( + functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionStats getFunctionStats(String tenant, String namespace, String function) throws PulsarAdminException { + try { + Response response = request( + functions.path(tenant).path(namespace).path(function).path("stats")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(FunctionStats.class); + } catch (Exception e) { + throw getApiException(e); + } } + @Override public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException { try { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 0524a657ab..28f9183930 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -76,6 +76,8 @@ private final UpdateFunction updater; private final GetFunction getter; private final GetFunctionStatus functionStatus; + @Getter + private final GetFunctionStats functionStats; private final RestartFunction restart; private final StopFunction stop; private final ListFunctions lister; @@ -638,6 +640,24 @@ void runCmd() throws Exception { } } + @Parameters(commandDescription = "Get the current stats of a Pulsar Function") + class GetFunctionStats extends FunctionCommand { + + @Parameter(names = "--instance-id", description = "The function instanceId (Get-status of all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + if (isBlank(instanceId)) { + System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName))); + } else { + System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId)))); + } + } + } + @Parameters(commandDescription = "Restart function instance") class RestartFunction extends FunctionCommand { @@ -878,6 +898,7 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { updater = new UpdateFunction(); getter = new GetFunction(); functionStatus = new GetFunctionStatus(); + functionStats = new GetFunctionStats(); lister = new ListFunctions(); stateGetter = new StateGetter(); triggerer = new TriggerFunction(); @@ -893,6 +914,7 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { jcommander.addCommand("restart", getRestarter()); jcommander.addCommand("stop", getStopper()); jcommander.addCommand("getstatus", getStatuser()); + jcommander.addCommand("stats", getFunctionStats()); jcommander.addCommand("list", getLister()); jcommander.addCommand("querystate", getStateGetter()); jcommander.addCommand("trigger", getTriggerer()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java new file mode 100644 index 0000000000..99e52e53b0 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.Data; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +@Data +public class FunctionStats { + + /** + * Overall total number of records function received from source + **/ + public long receivedTotal; + + /** + * Overall total number of records successfully processed by user function + **/ + public long processedSuccessfullyTotal; + + /** + * Overall total number of system exceptions thrown + **/ + public long systemExceptionsTotal; + + /** + * Overall total number of user exceptions thrown + **/ + public long userExceptionsTotal; + + /** + * Average process latency for function + **/ + public double avgProcessLatency; + + /** + * Timestamp of when the function was last invoked by any instance + **/ + public long lastInvocation; + + @Data + public static class FunctionInstanceStats { + + /** Instance Id of function instance **/ + public int instanceId; + + @Data + public static class FunctionInstanceStatsData { + + /** + * Total number of records function received from source for instance + **/ + public long receivedTotal; + + /** + * Total number of records successfully processed by user function for instance + **/ + public long processedSuccessfullyTotal; + + /** + * Total number of system exceptions thrown for instance + **/ + public long systemExceptionsTotal; + + /** + * Total number of user exceptions thrown for instance + **/ + public long userExceptionsTotal; + + /** + * Average process latency for function for instance + **/ + public double avgProcessLatency; + + /** + * Timestamp of when the function was last invoked for instance + **/ + public long lastInvocation; + + /** + * Map of user defined metrics + **/ + public Map userMetrics = new HashMap<>(); + } + + public FunctionInstanceStatsData metrics = new FunctionInstanceStatsData(); + } + + public List instances = new LinkedList<>(); + + public void addInstance(FunctionInstanceStats functionInstanceStats) { + instances.add(functionInstanceStats); + } + + public FunctionStats calculateOverall() { + + lastInvocation = 0; + instances.forEach(new Consumer() { + @Override + public void accept(FunctionInstanceStats functionInstanceStats) { + FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = functionInstanceStats.getMetrics(); + receivedTotal += functionInstanceStatsData.receivedTotal; + processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal; + systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal; + userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal; + avgProcessLatency += functionInstanceStatsData.avgProcessLatency; + if (functionInstanceStatsData.lastInvocation > lastInvocation) { + lastInvocation = functionInstanceStatsData.lastInvocation; + } + + } + }); + // calculate average from sum + avgProcessLatency = avgProcessLatency / instances.size(); + return this; + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index d13964df34..06048795a7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -50,11 +50,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; +import static org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX; /** * This class implements the Context interface exposed to the user. @@ -67,35 +66,6 @@ // Per Message related private Record record; - @Getter - @Setter - private class AccumulatedMetricDatum { - private double count; - private double sum; - private double max; - private double min; - - AccumulatedMetricDatum() { - count = 0; - sum = 0; - max = Double.MIN_VALUE; - min = Double.MAX_VALUE; - } - - public void update(double value) { - count++; - sum += value; - if (max < value) { - max = value; - } - if (min > value) { - min = value; - } - } - } - - private ConcurrentMap accumulatedMetrics; - private Map> publishProducers; private ProducerBuilderImpl producerBuilder; @@ -118,15 +88,14 @@ public void update(double value) { private final static String[] userMetricsLabelNames; static { // add label to indicate user metric - userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames, FunctionStats.metricsLabelNames.length + 1); - userMetricsLabelNames[FunctionStats.metricsLabelNames.length] = "metric"; + userMetricsLabelNames = Arrays.copyOf(FunctionStatsManager.metricsLabelNames, FunctionStatsManager.metricsLabelNames.length + 1); + userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] = "metric"; } public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List inputTopics, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels) { this.config = config; this.logger = logger; - this.accumulatedMetrics = new ConcurrentHashMap<>(); this.publishProducers = new HashMap<>(); this.inputTopics = inputTopics; this.topicSchema = new TopicSchema(client); @@ -342,40 +311,43 @@ public ByteBuffer getState(String key) { @Override public void recordMetric(String metricName, double value) { - userMetricsLabels.computeIfAbsent(metricName, - s -> { - String[] userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1); - userMetricLabels[userMetricLabels.length - 1] = metricName; - return userMetricLabels; - }); - - userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value); - accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); - accumulatedMetrics.get(metricName).update(value); + String[] userMetricLabels = userMetricsLabels.get(metricName); + if (userMetricLabels == null) { + userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1); + userMetricLabels[userMetricLabels.length - 1] = metricName; + // set label for metrics before putting into userMetricsLabels map to + // prevent race condition with getMetrics calls + userMetricsSummary.labels(userMetricLabels).observe(value); + userMetricsLabels.put(metricName, userMetricLabels); + } else { + userMetricsSummary.labels(userMetricLabels).observe(value); + } } - public MetricsData getAndResetMetrics() { - MetricsData retval = getMetrics(); + public Map getAndResetMetrics() { + Map retval = getMetrics(); resetMetrics(); return retval; } public void resetMetrics() { userMetricsSummary.clear(); - this.accumulatedMetrics.clear(); } - public MetricsData getMetrics() { - MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); - for (String metricName : accumulatedMetrics.keySet()) { - MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder(); - bldr.setSum(accumulatedMetrics.get(metricName).getSum()); - bldr.setCount(accumulatedMetrics.get(metricName).getCount()); - bldr.setMax(accumulatedMetrics.get(metricName).getMax()); - bldr.setMin(accumulatedMetrics.get(metricName).getMax()); - metricsDataBuilder.putMetrics(metricName, bldr.build()); + public Map getMetrics() { + Map metricsMap = new HashMap<>(); + for (Map.Entry userMetricsLabelsEntry : userMetricsLabels.entrySet()) { + String metricName = userMetricsLabelsEntry.getKey(); + String[] labels = userMetricsLabelsEntry.getValue(); + Summary.Child.Value summary = userMetricsSummary.labels(labels).get(); + metricsMap.put(String.format("%s%s_sum", USER_METRIC_PREFIX, metricName), summary.sum); + metricsMap.put(String.format("%s%s_count", USER_METRIC_PREFIX, metricName), summary.count); + for (Map.Entry entry : summary.quantiles.entrySet()) { + Double quantile = entry.getKey(); + Double value = entry.getValue(); + metricsMap.put(String.format("%s%s_%s", USER_METRIC_PREFIX, metricName, quantile), value); + } } - MetricsData retval = metricsDataBuilder.build(); - return retval; + return metricsMap; } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java similarity index 79% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java index c45699722e..388efb14b2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java @@ -34,18 +34,21 @@ @Slf4j @Getter @Setter -public class FunctionStats { +public class FunctionStatsManager { static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster"}; + public static final String PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"; + public final static String USER_METRIC_PREFIX = "user_metric_"; + /** Declare metric names **/ - static final String PULSAR_FUNCTION_PROCESSED_TOTAL = "pulsar_function_processed_total"; - static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL = "pulsar_function_processed_successfully_total"; - static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL = "pulsar_function_system_exceptions_total"; - static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL = "pulsar_function_user_exceptions_total"; - static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS = "pulsar_function_process_latency_ms"; - static final String PULSAR_FUNCTION_LAST_INVOCATION = "pulsar_function_last_invocation"; - static final String PULSAR_FUNCTION_RECEIVED_TOTAL = "pulsar_function_received_total"; + public static final String PROCESSED_TOTAL = "processed_total"; + public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total"; + public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total"; + public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total"; + public static final String PROCESS_LATENCY_MS = "process_latency_ms"; + public static final String LAST_INVOCATION = "last_invocation"; + public static final String RECEIVED_TOTAL = "received_total"; /** Declare Prometheus stats **/ @@ -70,37 +73,37 @@ @Getter private EvictingQueue latestSystemExceptions = EvictingQueue.create(10); - public FunctionStats(CollectorRegistry collectorRegistry) { + public FunctionStatsManager(CollectorRegistry collectorRegistry) { // Declare function local collector registry so that it will not clash with other function instances' // metrics collection especially in threaded mode functionCollectorRegistry = new CollectorRegistry(); statTotalProcessed = Counter.build() - .name(PULSAR_FUNCTION_PROCESSED_TOTAL) + .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL) .help("Total number of messages processed.") .labelNames(metricsLabelNames) .register(collectorRegistry); statTotalProcessedSuccessfully = Counter.build() - .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL) + .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL) .help("Total number of messages processed successfully.") .labelNames(metricsLabelNames) .register(collectorRegistry); statTotalSysExceptions = Counter.build() - .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL) + .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) .help("Total number of system exceptions.") .labelNames(metricsLabelNames) .register(collectorRegistry); statTotalUserExceptions = Counter.build() - .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL) + .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL) .help("Total number of user exceptions.") .labelNames(metricsLabelNames) .register(collectorRegistry); statProcessLatency = Summary.build() - .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS) + .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS) .help("Process latency in milliseconds.") .quantile(0.5, 0.01) .quantile(0.9, 0.01) @@ -110,13 +113,13 @@ public FunctionStats(CollectorRegistry collectorRegistry) { .register(collectorRegistry); statlastInvocation = Gauge.build() - .name(PULSAR_FUNCTION_LAST_INVOCATION) + .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION) .help("The timestamp of the last invocation of the function") .labelNames(metricsLabelNames) .register(collectorRegistry); statTotalRecordsRecieved = Counter.build() - .name(PULSAR_FUNCTION_RECEIVED_TOTAL) + .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL) .help("Total number of messages received from source.") .labelNames(metricsLabelNames) .register(collectorRegistry); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 39acdb57c9..acd0781161 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -26,6 +26,8 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.proto.InstanceCommunication; +import java.util.Map; + /** * This is the Java Instance. This is started by the runtimeSpawner using the JavaInstanceClient * program if invoking via a process based invocation or using JavaInstance using a thread @@ -74,7 +76,7 @@ public JavaExecutionResult handleMessage(Record record, Object input) { public void close() { } - public InstanceCommunication.MetricsData getAndResetMetrics() { + public Map getAndResetMetrics() { return context.getAndResetMetrics(); } @@ -82,7 +84,7 @@ public void resetMetrics() { context.resetMetrics(); } - public InstanceCommunication.MetricsData getMetrics() { + public Map getMetrics() { return context.getMetrics(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ab900ac40c..8ede5515fe 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -109,7 +109,7 @@ private Throwable deathException; // function stats - private final FunctionStats stats; + private final FunctionStatsManager stats; private Record currentRecord; @@ -133,7 +133,7 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, this.jarFile = jarFile; this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; - this.stats = new FunctionStats(collectorRegistry); + this.stats = new FunctionStatsManager(collectorRegistry); this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; this.metricsLabels = new String[]{ @@ -423,23 +423,17 @@ public void close() { } public InstanceCommunication.MetricsData getAndResetMetrics() { - InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); + InstanceCommunication.MetricsData metricsData = getMetrics(); stats.reset(); - if (javaInstance != null) { - InstanceCommunication.MetricsData userMetrics = javaInstance.getAndResetMetrics(); - if (userMetrics != null) { - bldr.putAllMetrics(userMetrics.getMetricsMap()); - } - } - return bldr.build(); + return metricsData; } public InstanceCommunication.MetricsData getMetrics() { InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); if (javaInstance != null) { - InstanceCommunication.MetricsData userMetrics = javaInstance.getMetrics(); + Map userMetrics = javaInstance.getMetrics(); if (userMetrics != null) { - bldr.putAllMetrics(userMetrics.getMetricsMap()); + bldr.putAllUserMetrics(userMetrics); } } return bldr.build(); @@ -452,16 +446,16 @@ public void resetMetrics() { private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_TOTAL, stats.statTotalProcessed.labels(metricsLabels).get(), bldr); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL, stats.statTotalProcessedSuccessfully.labels(metricsLabels).get(), bldr); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL, stats.statTotalSysExceptions.labels(metricsLabels).get(), bldr); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL, stats.statTotalUserExceptions.labels(metricsLabels).get(), bldr); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_RECEIVED_TOTAL, stats.statTotalRecordsRecieved.labels(metricsLabels).get(), bldr); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_PROCESS_LATENCY_MS, - stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0 - ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count, - bldr); - addSystemMetrics(FunctionStats.PULSAR_FUNCTION_LAST_INVOCATION, stats.statlastInvocation.labels(metricsLabels).get(), bldr); + + bldr.setProcessedTotal((long) stats.statTotalProcessed.labels(metricsLabels).get()); + bldr.setProcessedSuccessfullyTotal((long) stats.statTotalProcessedSuccessfully.labels(metricsLabels).get()); + bldr.setSystemExceptionsTotal((long) stats.statTotalSysExceptions.labels(metricsLabels).get()); + bldr.setUserExceptionsTotal((long) stats.statTotalUserExceptions.labels(metricsLabels).get()); + bldr.setReceivedTotal((long) stats.statTotalRecordsRecieved.labels(metricsLabels).get()); + bldr.setAvgProcessLatency(stats.statProcessLatency.labels(metricsLabels).get().count <= 0.0 + ? 0 : stats.statProcessLatency.labels(metricsLabels).get().sum / stats.statProcessLatency.labels(metricsLabels).get().count); + bldr.setLastInvocation((long) stats.statlastInvocation.labels(metricsLabels).get()); + return bldr; } @@ -485,13 +479,6 @@ private Builder createMetricsDataBuilder() { return functionStatusBuilder; } - private static void addSystemMetrics(String metricName, double value, InstanceCommunication.MetricsData.Builder bldr) { - InstanceCommunication.MetricsData.DataDigest digest = - InstanceCommunication.MetricsData.DataDigest.newBuilder() - .setCount(value).setSum(value).setMax(value).setMin(0).build(); - bldr.putMetrics(metricName, digest); - } - private void setupLogHandler() { if (instanceConfig.getFunctionDetails().getLogTopic() != null && !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) { diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py index f33f4f45d9..6c843dccd9 100644 --- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py +++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py @@ -39,7 +39,7 @@ name='InstanceCommunication.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd8\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12#\n\x07metrics\x18\x0f \x01(\x0b\x32\x12.proto.MetricsData\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3') + serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xb3\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12\x10\n\x08workerId\x18\x10 \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"V\n\x12\x46unctionStatusList\x12\r\n\x05\x65rror\x18\x02 \x01(\t\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xbd\x02\n\x0bMetricsData\x12\x15\n\rreceivedTotal\x18\x02 \x01(\x03\x12\x16\n\x0eprocessedTotal\x18\x03 \x01(\x03\x12\"\n\x1aprocessedSuccessfullyTotal\x18\x04 \x01(\x03\x12\x1d\n\x15systemExceptionsTotal\x18\x05 \x01(\x03\x12\x1b\n\x13userExceptionsTotal\x18\x06 \x01(\x03\x12\x19\n\x11\x61vgProcessLatency\x18\x07 \x01(\x01\x12\x16\n\x0elastInvocation\x18\x08 \x01(\x03\x12\x38\n\x0buserMetrics\x18\t \x03(\x0b\x32#.proto.MetricsData.UserMetricsEntry\x1a\x32\n\x10UserMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\"\x98\x01\n\x07Metrics\x12/\n\x07metrics\x18\x01 \x03(\x0b\x32\x1e.proto.Metrics.InstanceMetrics\x1a\\\n\x0fInstanceMetrics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\x12\'\n\x0bmetricsData\x18\x03 \x01(\x0b\x32\x12.proto.MetricsData2\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3') , dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) @@ -79,8 +79,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=661, - serialized_end=730, + serialized_start=624, + serialized_end=693, ) _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor( @@ -116,8 +116,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=732, - serialized_end=796, + serialized_start=695, + serialized_end=759, ) _FUNCTIONSTATUS = _descriptor.Descriptor( @@ -226,14 +226,7 @@ is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='metrics', full_name='proto.FunctionStatus.metrics', index=14, - number=15, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='workerId', full_name='proto.FunctionStatus.workerId', index=15, + name='workerId', full_name='proto.FunctionStatus.workerId', index=14, number=16, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -252,7 +245,7 @@ oneofs=[ ], serialized_start=68, - serialized_end=796, + serialized_end=759, ) @@ -264,7 +257,14 @@ containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=0, + name='error', full_name='proto.FunctionStatusList.error', index=0, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=1, number=1, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, @@ -282,109 +282,107 @@ extension_ranges=[], oneofs=[ ], - serialized_start=798, - serialized_end=869, + serialized_start=761, + serialized_end=847, ) -_METRICSDATA_DATADIGEST = _descriptor.Descriptor( - name='DataDigest', - full_name='proto.MetricsData.DataDigest', +_METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor( + name='UserMetricsEntry', + full_name='proto.MetricsData.UserMetricsEntry', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='count', full_name='proto.MetricsData.DataDigest.count', index=0, - number=1, type=1, cpp_type=5, label=1, - has_default_value=False, default_value=float(0), + name='key', full_name='proto.MetricsData.UserMetricsEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='sum', full_name='proto.MetricsData.DataDigest.sum', index=1, + name='value', full_name='proto.MetricsData.UserMetricsEntry.value', index=1, number=2, type=1, cpp_type=5, label=1, has_default_value=False, default_value=float(0), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='max', full_name='proto.MetricsData.DataDigest.max', index=2, - number=3, type=1, cpp_type=5, label=1, - has_default_value=False, default_value=float(0), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='min', full_name='proto.MetricsData.DataDigest.min', index=3, - number=4, type=1, cpp_type=5, label=1, - has_default_value=False, default_value=float(0), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=937, - serialized_end=1003, + serialized_start=1117, + serialized_end=1167, ) -_METRICSDATA_METRICSENTRY = _descriptor.Descriptor( - name='MetricsEntry', - full_name='proto.MetricsData.MetricsEntry', +_METRICSDATA = _descriptor.Descriptor( + name='MetricsData', + full_name='proto.MetricsData', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='key', full_name='proto.MetricsData.MetricsEntry.key', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), + name='receivedTotal', full_name='proto.MetricsData.receivedTotal', index=0, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='value', full_name='proto.MetricsData.MetricsEntry.value', index=1, - number=2, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, + name='processedTotal', full_name='proto.MetricsData.processedTotal', index=1, + number=3, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=1005, - serialized_end=1082, -) - -_METRICSDATA = _descriptor.Descriptor( - name='MetricsData', - full_name='proto.MetricsData', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ _descriptor.FieldDescriptor( - name='metrics', full_name='proto.MetricsData.metrics', index=0, - number=1, type=11, cpp_type=10, label=3, + name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=2, + number=4, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=3, + number=5, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=4, + number=6, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=5, + number=7, type=1, cpp_type=5, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=6, + number=8, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='userMetrics', full_name='proto.MetricsData.userMetrics', index=7, + number=9, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -392,7 +390,7 @@ ], extensions=[ ], - nested_types=[_METRICSDATA_DATADIGEST, _METRICSDATA_METRICSENTRY, ], + nested_types=[_METRICSDATA_USERMETRICSENTRY, ], enum_types=[ ], options=None, @@ -401,8 +399,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=872, - serialized_end=1082, + serialized_start=850, + serialized_end=1167, ) @@ -432,8 +430,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1084, - serialized_end=1120, + serialized_start=1169, + serialized_end=1205, ) @@ -477,8 +475,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1183, - serialized_end=1275, + serialized_start=1268, + serialized_end=1360, ) _METRICS = _descriptor.Descriptor( @@ -507,8 +505,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1123, - serialized_end=1275, + serialized_start=1208, + serialized_end=1360, ) _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS @@ -516,12 +514,9 @@ _FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION _FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION _FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type = _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY -_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA _FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS -_METRICSDATA_DATADIGEST.containing_type = _METRICSDATA -_METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = _METRICSDATA_DATADIGEST -_METRICSDATA_METRICSENTRY.containing_type = _METRICSDATA -_METRICSDATA.fields_by_name['metrics'].message_type = _METRICSDATA_METRICSENTRY +_METRICSDATA_USERMETRICSENTRY.containing_type = _METRICSDATA +_METRICSDATA.fields_by_name['userMetrics'].message_type = _METRICSDATA_USERMETRICSENTRY _METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type = _METRICSDATA _METRICS_INSTANCEMETRICS.containing_type = _METRICS _METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS @@ -564,17 +559,10 @@ MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), dict( - DataDigest = _reflection.GeneratedProtocolMessageType('DataDigest', (_message.Message,), dict( - DESCRIPTOR = _METRICSDATA_DATADIGEST, - __module__ = 'InstanceCommunication_pb2' - # @@protoc_insertion_point(class_scope:proto.MetricsData.DataDigest) - )) - , - - MetricsEntry = _reflection.GeneratedProtocolMessageType('MetricsEntry', (_message.Message,), dict( - DESCRIPTOR = _METRICSDATA_METRICSENTRY, + UserMetricsEntry = _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), dict( + DESCRIPTOR = _METRICSDATA_USERMETRICSENTRY, __module__ = 'InstanceCommunication_pb2' - # @@protoc_insertion_point(class_scope:proto.MetricsData.MetricsEntry) + # @@protoc_insertion_point(class_scope:proto.MetricsData.UserMetricsEntry) )) , DESCRIPTOR = _METRICSDATA, @@ -582,8 +570,7 @@ # @@protoc_insertion_point(class_scope:proto.MetricsData) )) _sym_db.RegisterMessage(MetricsData) -_sym_db.RegisterMessage(MetricsData.DataDigest) -_sym_db.RegisterMessage(MetricsData.MetricsEntry) +_sym_db.RegisterMessage(MetricsData.UserMetricsEntry) HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), dict( DESCRIPTOR = _HEALTHCHECKRESULT, @@ -612,8 +599,8 @@ DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication')) _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.has_options = True _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) -_METRICSDATA_METRICSENTRY.has_options = True -_METRICSDATA_METRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) +_METRICSDATA_USERMETRICSENTRY.has_options = True +_METRICSDATA_USERMETRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) _INSTANCECONTROL = _descriptor.ServiceDescriptor( name='InstanceControl', @@ -621,8 +608,8 @@ file=DESCRIPTOR, index=0, options=None, - serialized_start=1278, - serialized_end=1626, + serialized_start=1363, + serialized_end=1711, methods=[ _descriptor.MethodDescriptor( name='GetFunctionStatus', diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 9a37d59ab1..381d612e48 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -29,27 +29,10 @@ import pulsar import util -import InstanceCommunication_pb2 from prometheus_client import Summary from function_stats import Stats -# For keeping track of accumulated metrics -class AccumulatedMetricDatum(object): - def __init__(self): - self.count = 0.0 - self.sum = 0.0 - self.max = float('-inf') - self.min = float('inf') - - def update(self, value): - self.count += 1 - self.sum += value - if value > self.max: - self.max = value - if value < self.min: - self.min = value - class ContextImpl(pulsar.Context): # add label to indicate user metric @@ -62,7 +45,6 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, self.user_code_dir = os.path.dirname(user_code) self.consumers = consumers self.secrets_provider = secrets_provider - self.accumulated_metrics = {} self.publish_producers = {} self.publish_serializers = {} self.current_message_id = None @@ -132,9 +114,6 @@ def record_metric(self, metric_name, metric_value): if metric_name not in self.user_metrics_labels: self.user_metrics_labels[metric_name] = self.metrics_labels + [metric_name] self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value) - if not metric_name in self.accumulated_metrics: - self.accumulated_metrics[metric_name] = AccumulatedMetricDatum() - self.accumulated_metrics[metric_name].update(metric_value) def get_output_topic(self): return self.instance_config.function_details.output @@ -182,13 +161,11 @@ def reset_metrics(self): for labels in self.user_metrics_labels.values(): self.user_metrics_summary.labels(*labels)._sum.set(0.0) self.user_metrics_summary.labels(*labels)._count.set(0.0) - self.accumulated_metrics.clear() def get_metrics(self): - metrics = InstanceCommunication_pb2.MetricsData() - for metric_name, accumulated_metric in self.accumulated_metrics.items(): - metrics.metrics[metric_name].count = accumulated_metric.count - metrics.metrics[metric_name].sum = accumulated_metric.sum - metrics.metrics[metric_name].max = accumulated_metric.max - metrics.metrics[metric_name].min = accumulated_metric.min - return metrics + metrics_map = {} + for metric_name, metric_labels in self.user_metrics_labels.items(): + metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._sum.get() + metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._count.get() + + return metrics_map diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py index 13b3f8442f..ff575098d7 100644 --- a/pulsar-functions/instance/src/main/python/function_stats.py +++ b/pulsar-functions/instance/src/main/python/function_stats.py @@ -26,28 +26,31 @@ class Stats(object): metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 'cluster'] - TOTAL_PROCESSED = 'pulsar_function_processed_total' - TOTAL_SUCCESSFULLY_PROCESSED = 'pulsar_function_processed_successfully_total' - TOTAL_SYSTEM_EXCEPTIONS = 'pulsar_function_system_exceptions_total' - TOTAL_USER_EXCEPTIONS = 'pulsar_function_user_exceptions_total' - PROCESS_LATENCY_MS = 'pulsar_function_process_latency_ms' - LAST_INVOCATION = 'pulsar_function_last_invocation' - TOTAL_RECEIVED = 'pulsar_function_received_total' + PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_" + USER_METRIC_PREFIX = "user_metric_"; + + TOTAL_PROCESSED = 'processed_total' + TOTAL_SUCCESSFULLY_PROCESSED = 'processed_successfully_total' + TOTAL_SYSTEM_EXCEPTIONS = 'system_exceptions_total' + TOTAL_USER_EXCEPTIONS = 'user_exceptions_total' + PROCESS_LATENCY_MS = 'process_latency_ms' + LAST_INVOCATION = 'last_invocation' + TOTAL_RECEIVED = 'received_total' # Declare Prometheus - stat_total_processed = Counter(TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names) - stat_total_processed_successfully = Counter(TOTAL_SUCCESSFULLY_PROCESSED, + stat_total_processed = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_PROCESSED, 'Total number of messages processed.', metrics_label_names) + stat_total_processed_successfully = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_SUCCESSFULLY_PROCESSED, 'Total number of messages processed successfully.', metrics_label_names) - stat_total_sys_exceptions = Counter(TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.', + stat_total_sys_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX+ TOTAL_SYSTEM_EXCEPTIONS, 'Total number of system exceptions.', metrics_label_names) - stat_total_user_exceptions = Counter(TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.', + stat_total_user_exceptions = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_USER_EXCEPTIONS, 'Total number of user exceptions.', metrics_label_names) - stat_process_latency_ms = Summary(PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names) + stat_process_latency_ms = Summary(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS, 'Process latency in milliseconds.', metrics_label_names) - stat_last_invocation = Gauge(LAST_INVOCATION, 'The timestamp of the last invocation of the function.', metrics_label_names) + stat_last_invocation = Gauge(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION, 'The timestamp of the last invocation of the function.', metrics_label_names) - stat_total_received = Counter(TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names) + stat_total_received = Counter(PULSAR_FUNCTION_METRICS_PREFIX + TOTAL_RECEIVED, 'Total number of messages received from source.', metrics_label_names) latest_user_exception = [] latest_sys_exception = [] diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 1d56a3984c..f05e7b2b88 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -286,20 +286,34 @@ def reset_metrics(self): self.contextimpl.reset_metrics() def get_metrics(self): - # First get any user metrics - metrics = self.contextimpl.get_metrics() - # Now add system metrics as well - self.add_system_metrics(Stats.TOTAL_PROCESSED, Stats.stat_total_processed.labels(*self.metrics_labels)._value.get(), metrics) - self.add_system_metrics(Stats.TOTAL_SUCCESSFULLY_PROCESSED, Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get(), metrics) - self.add_system_metrics(Stats.TOTAL_SYSTEM_EXCEPTIONS, Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get(), metrics) - self.add_system_metrics(Stats.TOTAL_USER_EXCEPTIONS, Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get(), metrics) - self.add_system_metrics(Stats.PROCESS_LATENCY_MS, - 0.0 if Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() <= 0.0 - else Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() / Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get(), - metrics) - self.add_system_metrics(Stats.TOTAL_RECEIVED, Stats.stat_total_received.labels(*self.metrics_labels)._value.get(), metrics) - self.add_system_metrics(Stats.LAST_INVOCATION, Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get(), metrics) - return metrics + + total_received = Stats.stat_total_received.labels(*self.metrics_labels)._value.get() + total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get() + total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get() + total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get() + total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get() + process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() + process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() + last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get() + + metrics_data = InstanceCommunication_pb2.MetricsData() + + metrics_data.receivedTotal = int(total_received) if sys.version_info.major >= 3 else long(total_received) + metrics_data.processedTotal = int(total_processed) if sys.version_info.major >= 3 else long(total_processed) + metrics_data.processedSuccessfullyTotal = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully) + metrics_data.systemExceptionsTotal = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions) + metrics_data.userExceptionsTotal = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions) + metrics_data.avgProcessLatency = 0.0 \ + if process_latency_ms_count <= 0.0 \ + else process_latency_ms_sum / process_latency_ms_count + metrics_data.lastInvocation = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) + + # get any user metrics + user_metrics = self.contextimpl.get_metrics() + for metric_name, value in user_metrics.items(): + metrics_data.userMetrics[metric_name] = value + + return metrics_data def add_system_metrics(self, metric_name, value, metrics): metrics.metrics[metric_name].count = value @@ -312,30 +326,30 @@ def get_function_status(self): status.running = True total_processed = Stats.stat_total_processed.labels(*self.metrics_labels)._value.get() - stat_total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get() - stat_total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get() - stat_total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get() - stat_process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() - stat_process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() - stat_last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get() - - status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed) - status.numSuccessfullyProcessed = int(stat_total_processed_successfully) if sys.version_info.major >= 3 else long(stat_total_processed_successfully) - status.numUserExceptions = int(stat_total_user_exceptions) if sys.version_info.major >= 3 else long(stat_total_user_exceptions) + total_processed_successfully = Stats.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get() + total_user_exceptions = Stats.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get() + total_sys_exceptions = Stats.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get() + process_latency_ms_count = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._count.get() + process_latency_ms_sum = Stats.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get() + last_invocation = Stats.stat_last_invocation.labels(*self.metrics_labels)._value.get() + + status.numProcessed = int(total_processed) if sys.version_info.major >= 3 else long(total_processed) + status.numSuccessfullyProcessed = int(total_processed_successfully) if sys.version_info.major >= 3 else long(total_processed_successfully) + status.numUserExceptions = int(total_user_exceptions) if sys.version_info.major >= 3 else long(total_user_exceptions) status.instanceId = self.instance_config.instance_id for ex, tm in self.stats.latest_user_exception: to_add = status.latestUserExceptions.add() to_add.exceptionString = ex to_add.msSinceEpoch = tm - status.numSystemExceptions = int(stat_total_sys_exceptions) if sys.version_info.major >= 3 else long(stat_total_sys_exceptions) + status.numSystemExceptions = int(total_sys_exceptions) if sys.version_info.major >= 3 else long(total_sys_exceptions) for ex, tm in self.stats.latest_sys_exception: to_add = status.latestSystemExceptions.add() to_add.exceptionString = ex to_add.msSinceEpoch = tm status.averageLatency = 0.0 \ - if stat_process_latency_ms_count <= 0.0 \ - else stat_process_latency_ms_sum / stat_process_latency_ms_count - status.lastInvocationTime = int(stat_last_invocation) if sys.version_info.major >= 3 else long(stat_last_invocation) + if process_latency_ms_count <= 0.0 \ + else process_latency_ms_sum / process_latency_ms_count + status.lastInvocationTime = int(last_invocation) if sys.version_info.major >= 3 else long(last_invocation) return status def join(self): diff --git a/pulsar-functions/metrics/pom.xml b/pulsar-functions/metrics/pom.xml deleted file mode 100644 index ca5e0a5d55..0000000000 --- a/pulsar-functions/metrics/pom.xml +++ /dev/null @@ -1,88 +0,0 @@ - - - 4.0.0 - - - org.apache.pulsar - pulsar-functions - 2.3.0-SNAPSHOT - - - pulsar-functions-metrics - Pulsar Functions :: Metrics - - - - - org.apache.pulsar - pulsar-functions-proto - ${project.version} - - - - org.apache.pulsar - pulsar-functions-utils - ${project.version} - - - - com.beust - jcommander - - - - - - - - org.apache.maven.plugins - maven-assembly-plugin - - - package - - single - - - - PrometheusMetricsServer - - - - - org.apache.pulsar.functions.sink.PrometheusMetricsServer - - - - - jar-with-dependencies - - false - - - - - - - - diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/MetricsSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/MetricsSink.java deleted file mode 100644 index d22bf6a316..0000000000 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/MetricsSink.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.metrics; - -import java.util.Map; - -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; - -/** - * The metrics sink interface.

- * Implementations of this interface consume the {@link MetricsData} - * The calling entity pushes the {@link MetricsData} to the sink using - * {@link #processRecord(MetricsData)} method. - * And {@link #flush()} is called at an interval according to the configuration - */ -public interface MetricsSink extends AutoCloseable { - /** - * Initialize the MetricsSink - * - * @param conf An unmodifiableMap containing basic configuration - * Attempts to modify the returned map, - * whether direct or via its collection views, result in an UnsupportedOperationException. - */ - void init(Map conf); - - /** - * Process a metrics record in the sink - * @param record the record to put - * @param functionDetails functionDetails that generated this record - */ - void processRecord(MetricsData record, Function.FunctionDetails functionDetails); - - /** - * Flush any buffered metrics - * It would be called at an interval according to the configuration - */ - void flush(); - - /** - * Closes this stream and releases any system resources associated - * with it. If the stream is already closed then invoking this - * method has no effect. - */ - void close(); -} diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java deleted file mode 100644 index ef9f0cec63..0000000000 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.functions.metrics; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.Empty; -import com.google.protobuf.util.JsonFormat; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.metrics.sink.AbstractWebSink; -import org.apache.pulsar.functions.metrics.sink.PrometheusSink; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.proto.InstanceControlGrpc; - -import java.util.HashMap; -import java.util.Map; -import java.util.TimerTask; -import java.util.concurrent.*; - -/** - * A function container implemented using java thread. - */ -@Slf4j -public class PrometheusMetricsServer { - @Parameter(names = "--function_details", description = "Function details json\n", required = true) - protected String functionDetailsJsonString; - - @Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true) - protected int prometheusPort; - - @Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true) - protected int grpc_port; - - @Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true) - protected int metricsCollectionInterval; - - private FunctionDetails functionDetails; - private MetricsSink metricsSink; - private ManagedChannel channel; - private InstanceControlGrpc.InstanceControlFutureStub stub; - private ScheduledExecutorService timer; - - public PrometheusMetricsServer() { } - - - public void start() throws Exception { - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - if (functionDetailsJsonString.charAt(0) == '\'') { - functionDetailsJsonString = functionDetailsJsonString.substring(1); - } - if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') { - functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1); - } - JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder); - functionDetails = functionDetailsBuilder.build(); - - metricsSink = new PrometheusSink(); - Map config = new HashMap<>(); - config.put(AbstractWebSink.KEY_PATH, "/metrics"); - config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort)); - metricsSink.init(config); - - channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port) - .usePlaintext(true) - .build(); - stub = InstanceControlGrpc.newFutureStub(channel); - - if (metricsCollectionInterval > 0) { - timer = Executors.newSingleThreadScheduledExecutor(); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - CompletableFuture result = getMetrics(); - try { - metricsSink.processRecord(result.get(), functionDetails); - } catch (Exception e) { - log.error("Getting metrics data failed {}/{}/{}", - functionDetails.getTenant(), - functionDetails.getNamespace(), - functionDetails.getName(), - e); - } - } - }, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS); - } - } - - public CompletableFuture getMetrics() { - CompletableFuture retval = new CompletableFuture<>(); - ListenableFuture response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build()); - Futures.addCallback(response, new FutureCallback() { - @Override - public void onFailure(Throwable throwable) { - retval.completeExceptionally(throwable); - } - - @Override - public void onSuccess(InstanceCommunication.MetricsData t) { - retval.complete(t); - } - }); - return retval; - } - - public static void main(String[] args) throws Exception { - PrometheusMetricsServer server = new PrometheusMetricsServer(); - JCommander jcommander = new JCommander(server); - jcommander.setProgramName("PrometheusMetricsServer"); - - // parse args by JCommander - jcommander.parse(args); - server.start(); - } -} diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/package-info.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/package-info.java deleted file mode 100644 index 413e52d702..0000000000 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Spanwer for spawning processes, threads, docker containers to execute functions. - */ -package org.apache.pulsar.functions.metrics; \ No newline at end of file diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java deleted file mode 100644 index cb5541a462..0000000000 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.metrics.sink; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; - -import com.sun.net.httpserver.HttpServer; - -import org.apache.pulsar.functions.metrics.MetricsSink; - - -/** - * A metrics sink that publishes metrics on a http endpoint - */ -abstract public class AbstractWebSink implements MetricsSink { - private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName()); - - private static final int HTTP_STATUS_OK = 200; - - // Metrics will be published on http://host:port/path, the port - public static final String KEY_PORT = "port"; - - // The path - public static final String KEY_PATH = "path"; - - // Maximum number of metrics getting served - private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size"; - private static final String DEFAULT_MAX_CACHE_SIZE = "1000000"; - - // Time To Live before a metric gets evicted from the cache - private static final String KEY_METRICS_CACHE_TTL_SEC = "metrics-cache-ttl-sec"; - private static final String DEFAULT_CACHE_TTL_SECONDS = "600"; - - private HttpServer httpServer; - private long cacheMaxSize; - private long cacheTtlSeconds; - private final Ticker cacheTicker; - - AbstractWebSink() { - this(Ticker.systemTicker()); - } - - @VisibleForTesting - AbstractWebSink(Ticker cacheTicker) { - this.cacheTicker = cacheTicker; - } - - @Override - public final void init(Map conf) { - String path = conf.get(KEY_PATH); - - cacheMaxSize = Long.valueOf(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE, - DEFAULT_MAX_CACHE_SIZE)); - - cacheTtlSeconds = Long.valueOf(conf.getOrDefault(KEY_METRICS_CACHE_TTL_SEC, - DEFAULT_CACHE_TTL_SECONDS)); - - // initialize child classes - initialize(conf); - - int port = Integer.valueOf(conf.getOrDefault(KEY_PORT, "9099")); - startHttpServer(path, port); - } - - /** - * Start a http server on supplied port that will serve the metrics, as json, - * on the specified path. - * - * @param path - * @param port - */ - protected void startHttpServer(String path, int port) { - LOG.info("Starting AbstractWebMetricSink at path" + path + " and port " + port); - try { - httpServer = HttpServer.create(new InetSocketAddress(port), 0); - httpServer.createContext(path, httpExchange -> { - byte[] response = generateResponse(); - httpExchange.sendResponseHeaders(HTTP_STATUS_OK, response.length); - OutputStream os = httpExchange.getResponseBody(); - os.write(response); - os.close(); - LOG.log(Level.INFO, "Received metrics request."); - }); - httpServer.start(); - } catch (IOException e) { - throw new RuntimeException("Failed to create Http server on port " + port, e); - } - } - - // a convenience method for creating a metrics cache - Cache createCache() { - return CacheBuilder.newBuilder() - .maximumSize(cacheMaxSize) - .expireAfterWrite(cacheTtlSeconds, TimeUnit.SECONDS) - .ticker(cacheTicker) - .build(); - } - - abstract byte[] generateResponse() throws IOException; - - abstract void initialize(Map configuration); - - @Override - public void flush() { } - - @Override - public void close() { - if (httpServer != null) { - httpServer.stop(0); - } - } -} diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/FileSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/FileSink.java deleted file mode 100644 index 64bc97d116..0000000000 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/FileSink.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.metrics.sink; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.util.Map; -import java.util.logging.Logger; - -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.metrics.MetricsSink; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; -import org.apache.pulsar.functions.utils.Utils; - -/** - * A metrics sink that writes to a file in json format - * We would create/overwrite a file every time the flush() in invoked - * We would save at most fileMaximum metrics file in disk - */ -public class FileSink implements MetricsSink { - private static final Logger LOG = Logger.getLogger(FileSink.class.getName()); - - private static final String FILENAME_KEY = "filename-output"; - private static final String MAXIMUM_FILE_COUNT_KEY = "file-maximum"; - - // We would convert a file's metrics into a JSON object, i.e. array - // So we need to add "[" at the start and "]" at the end - private boolean isFileStart = true; - private PrintStream writer; - private String filenameKey; - private int fileMaximum = 1; - private int currentFileIndex = 0; - - @Override - public void init(Map conf) { - verifyConf(conf); - filenameKey = conf.get(FILENAME_KEY); - fileMaximum = Integer.valueOf(conf.get(MAXIMUM_FILE_COUNT_KEY)); - } - - private void verifyConf(Map conf) { - if (!conf.containsKey(FILENAME_KEY)) { - throw new IllegalArgumentException("Require: " + FILENAME_KEY); - } - if (!conf.containsKey(MAXIMUM_FILE_COUNT_KEY)) { - throw new IllegalArgumentException("Require: " + MAXIMUM_FILE_COUNT_KEY); - } - } - - private PrintStream openNewFile(String filename) { - // If the file already exists, set it Writable to avoid permission denied - File f = new File(filename); - if (f.exists() && !f.isDirectory()) { - f.setWritable(true); - } - - try { - return new PrintStream(new FileOutputStream(filename, false), true, "UTF-8"); - } catch (FileNotFoundException | UnsupportedEncodingException e) { - throw new RuntimeException("Error creating " + filename, e); - } - } - - @Override - public void processRecord(InstanceCommunication.MetricsData record, Function.FunctionDetails FunctionDetails) { - if (isFileStart) { - String filenamePrefix = filenameKey + "." + FunctionDetailsUtils.getFullyQualifiedName(FunctionDetails); - writer = openNewFile(String.format("%s.%d", filenamePrefix, currentFileIndex)); - - writer.print("["); - isFileStart = false; - } else { - writer.print(","); - } - - try { - String metrics = Utils.printJson(record); - writer.print(metrics); - } catch (Exception ex) { - } - } - - @Override - public void flush() { - if (isFileStart) { - // No record has been processed since the previous flush, so create a new file - // and output an empty JSON array. - writer = openNewFile(String.format("%s.%d", filenameKey, currentFileIndex)); - writer.print("["); - } - writer.print("]"); - writer.flush(); - writer.close(); - new File(String.format("%s.%s", filenameKey, currentFileIndex)).setReadOnly(); - - currentFileIndex = (currentFileIndex + 1) % fileMaximum; - - isFileStart = true; - } - - @Override - public void close() { - if (writer != null) { - writer.close(); - } - } -} \ No newline at end of file diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/PrometheusSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/PrometheusSink.java deleted file mode 100644 index 26aa940083..0000000000 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/PrometheusSink.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.metrics.sink; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Logger; -import java.util.regex.Pattern; - -import com.google.common.cache.Cache; - -import lombok.Getter; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; - -/** - * A web sink that exposes and endpoint that Prometheus can scrape - * - * metrics are generated in a text format and separated with a newline "\n" - * https://prometheus.io/docs/instrumenting/exposition_formats - * - * metrics format: - * heron_metric{topology="topology-name",component="component-id",instance="instance-id"} value timestamp - */ -public class PrometheusSink extends AbstractWebSink { - private static final Logger LOG = Logger.getLogger(PrometheusSink.class.getName()); - - private static final String PREFIX = "pulsar_function"; - - private static final String DELIMITER = "\n"; - - // This is the cache that is used to serve the metrics - @Getter - private Cache> metricsCache; - - public PrometheusSink() { - super(); - } - - @Override - void initialize(Map configuration) { - metricsCache = createCache(); - } - - @Override - byte[] generateResponse() throws IOException { - metricsCache.cleanUp(); - final Map> metrics = metricsCache.asMap(); - final StringBuilder sb = new StringBuilder(); - - metrics.forEach((String source, Map sourceMetrics) -> { - String tenant = FunctionDetailsUtils.extractTenantFromFQN(source); - String namespace = FunctionDetailsUtils.extractNamespaceFromFQN(source); - String name = FunctionDetailsUtils.extractFunctionNameFromFQN(source); - - sourceMetrics.forEach((String metricName, Double value) -> { - - String exportedMetricName = String.format("%s_%s", PREFIX, metricName); - sb.append(Prometheus.sanitizeMetricName(exportedMetricName)) - .append("{") - .append("tenant=\"").append(tenant).append("\",") - .append("namespace=\"").append(namespace).append("\",") - .append("functionname=\"").append(name).append("\""); - - sb.append("} ") - .append(Prometheus.doubleToGoString(value)) - .append(" ").append(currentTimeMillis()) - .append(DELIMITER); - }); - }); - - return sb.toString().getBytes(); - } - - @Override - public void processRecord(MetricsData record, Function.FunctionDetails functionDetails) { - final String source = FunctionDetailsUtils.getFullyQualifiedName(functionDetails); - - Map sourceCache = metricsCache.getIfPresent(source); - if (sourceCache == null) { - final Cache newSourceCache = createCache(); - sourceCache = newSourceCache.asMap(); - } - - sourceCache.putAll(processMetrics(record)); - metricsCache.put(source, sourceCache); - } - - static Map processMetrics(MetricsData metrics) { - Map map = new HashMap<>(); - for (Map.Entry entry : metrics.getMetricsMap().entrySet()) { - map.put(entry.getKey() + "_count", entry.getValue().getCount()); - map.put(entry.getKey() + "_sum", entry.getValue().getSum()); - map.put(entry.getKey() + "_max", entry.getValue().getMax()); - map.put(entry.getKey() + "_min", entry.getValue().getMin()); - } - - return map; - } - - long currentTimeMillis() { - return System.currentTimeMillis(); - } - - // code taken from prometheus java_client repo - static final class Prometheus { - private static final Pattern METRIC_NAME_RE = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*"); - private static final Pattern METRIC_LABEL_NAME_RE = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*"); - private static final Pattern RESERVED_METRIC_LABEL_NAME_RE = Pattern.compile("__.*"); - - /** - * Throw an exception if the metric name is invalid. - */ - static void checkMetricName(String name) { - if (!METRIC_NAME_RE.matcher(name).matches()) { - throw new IllegalArgumentException("Invalid metric name: " + name); - } - } - - private static final Pattern SANITIZE_PREFIX_PATTERN = Pattern.compile("^[^a-zA-Z_]"); - private static final Pattern SANITIZE_BODY_PATTERN = Pattern.compile("[^a-zA-Z0-9_]"); - - /** - * Sanitize metric name - */ - static String sanitizeMetricName(String metricName) { - return SANITIZE_BODY_PATTERN.matcher( - SANITIZE_PREFIX_PATTERN.matcher(metricName).replaceFirst("_") - ).replaceAll("_"); - } - - /** - * Throw an exception if the metric label name is invalid. - */ - static void checkMetricLabelName(String name) { - if (!METRIC_LABEL_NAME_RE.matcher(name).matches()) { - throw new IllegalArgumentException("Invalid metric label name: " + name); - } - if (RESERVED_METRIC_LABEL_NAME_RE.matcher(name).matches()) { - throw new IllegalArgumentException( - "Invalid metric label name, reserved for internal use: " + name); - } - } - - /** - * Convert a double to its string representation in Go. - */ - static String doubleToGoString(double d) { - if (d == Double.POSITIVE_INFINITY) { - return "+Inf"; - } - if (d == Double.NEGATIVE_INFINITY) { - return "-Inf"; - } - if (Double.isNaN(d)) { - return "NaN"; - } - return Double.toString(d); - } - - private Prometheus() { - } - } -} \ No newline at end of file diff --git a/pulsar-functions/metrics/src/main/resources/prometheus_metricsserver_log4j2.yml b/pulsar-functions/metrics/src/main/resources/prometheus_metricsserver_log4j2.yml deleted file mode 100644 index 3ad41cbedf..0000000000 --- a/pulsar-functions/metrics/src/main/resources/prometheus_metricsserver_log4j2.yml +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -Configuration: - name: pulsar-functions-kubernetes-prometheus-metrics-server - monitorInterval: 30 - Appenders: - # Console - Console: - name: Console - target: SYSTEM_OUT - PatternLayout: - Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" diff --git a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/FileSinkTest.java b/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/FileSinkTest.java deleted file mode 100644 index a77b8a5837..0000000000 --- a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/FileSinkTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.metrics.sink; - -import java.io.File; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; - -import org.testng.annotations.Test; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.AfterMethod; -import static org.testng.Assert.assertEquals; -import static org.testng.AssertJUnit.fail; - -/** - * FileSink Tester. - */ -public class FileSinkTest { - - private FileSink fileSink; - private File tmpDir; - - @BeforeMethod - public void before() throws IOException { - fileSink = new FileSink(); - Map conf = new HashMap<>(); - tmpDir = Files.createTempDirectory("filesink").toFile(); - conf.put("filename-output", tmpDir.getAbsolutePath() + "/filesink"); - conf.put("file-maximum", "100"); - fileSink.init(conf); - } - - @AfterMethod - public void after() { - fileSink.close(); - for (File file: tmpDir.listFiles()) { - file.delete(); - } - tmpDir.delete(); - } - - /** - * Method: flush() - */ - @Test - public void testFirstFlushWithoutRecords() throws IOException { - fileSink.flush(); - String content = new String(readFromFile( - new File(tmpDir, "/filesink.0").getAbsolutePath())); - assertEquals("[]", content); - } - - /** - * Method: flush() - */ - @Test - public void testSuccessiveFlushWithoutRecords() throws UnsupportedEncodingException, IOException { - fileSink.flush(); - fileSink.flush(); - String content = new String(readFromFile( - new File(tmpDir, "/filesink.0").getAbsolutePath())); - assertEquals("[]", content); - content = new String(readFromFile(new File(tmpDir, "/filesink.1").getAbsolutePath())); - assertEquals("[]", content); - } - - /** - * Method: init() - */ - @Test - public void testIllegalConf() { - FileSink sink = new FileSink(); - Map conf = new HashMap<>(); - try { - sink.init(conf); - fail("Expected IllegalArgumentException."); - } catch (IllegalArgumentException e) { - assertEquals("Require: filename-output", e.getMessage()); - } - - sink = new FileSink(); - conf.put("filename-output", tmpDir.getAbsolutePath() + "/filesink"); - try { - sink.init(conf); - fail("Expected IllegalArgumentException."); - } catch (IllegalArgumentException e) { - assertEquals("Require: file-maximum", e.getMessage()); - } - } - - private String readFromFile(String path) throws IOException { - byte[] encoded = Files.readAllBytes(Paths.get(path)); - return new String(encoded, StandardCharsets.UTF_8); - } -} diff --git a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/PrometheusSinkTests.java b/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/PrometheusSinkTests.java deleted file mode 100644 index 887427f3ab..0000000000 --- a/pulsar-functions/metrics/src/test/java/org/apache/pulsar/functions/metrics/sink/PrometheusSinkTests.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.metrics.sink; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; -import org.testng.annotations.Test; -import org.testng.annotations.BeforeMethod; -import static org.testng.Assert.assertEquals; -import static org.testng.AssertJUnit.assertTrue; - -public class PrometheusSinkTests { - - private static final long NOW = System.currentTimeMillis(); - - private final class PrometheusTestSink extends PrometheusSink { - - private PrometheusTestSink() { - } - - @Override - protected void startHttpServer(String path, int port) { - // no need to start the server for tests - } - - public Map> getMetrics() { - return getMetricsCache().asMap(); - } - - long currentTimeMillis() { - return NOW; - } - } - - private Map defaultConf; - private InstanceCommunication.MetricsData records; - - @BeforeMethod - public void before() { - - defaultConf = new HashMap<>(); - defaultConf.put("port", "9999"); - defaultConf.put("path", "test"); - defaultConf.put("flat-metrics", "true"); - defaultConf.put("include-topology-name", "false"); - - InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - InstanceCommunication.MetricsData.DataDigest metric1 = - InstanceCommunication.MetricsData.DataDigest.newBuilder() - .setCount(2).setSum(5).setMax(3).setMin(2).build(); - bldr.putMetrics("metric_1", metric1); - InstanceCommunication.MetricsData.DataDigest metric2 = - InstanceCommunication.MetricsData.DataDigest.newBuilder() - .setCount(3).setSum(6).setMax(3).setMin(1).build(); - bldr.putMetrics("metric_2", metric2); - - records = bldr.build(); - } - - @Test - public void testMetricsGrouping() { - PrometheusTestSink sink = new PrometheusTestSink(); - sink.init(defaultConf); - Function.FunctionDetails functionDetails = createFunctionDetails("tenant", "namespace", "functionname"); - sink.processRecord(records, functionDetails); - - final Map> metrics = sink.getMetrics(); - assertTrue(metrics.containsKey(FunctionDetailsUtils.getFullyQualifiedName(functionDetails))); - } - - @Test - public void testResponse() throws IOException { - PrometheusTestSink sink = new PrometheusTestSink(); - sink.init(defaultConf); - Function.FunctionDetails functionDetails = createFunctionDetails("tenant", "namespace", "functionname"); - sink.processRecord(records, functionDetails); - - final List expectedLines = Arrays.asList( - createMetric(functionDetails, "metric_1_count", 2), - createMetric(functionDetails, "metric_1_sum", 5), - createMetric(functionDetails, "metric_1_max", 3), - createMetric(functionDetails, "metric_1_min", 2), - createMetric(functionDetails, "metric_2_count", 3), - createMetric(functionDetails, "metric_2_sum", 6), - createMetric(functionDetails, "metric_2_max", 3), - createMetric(functionDetails, "metric_2_min", 1) - ); - - final Set generatedLines = - new HashSet<>(Arrays.asList(new String(sink.generateResponse()).split("\n"))); - - assertEquals(expectedLines.size(), generatedLines.size()); - - expectedLines.forEach((String line) -> { - assertTrue(generatedLines.contains(line)); - }); - } - - private String createMetric(Function.FunctionDetails functionDetails, - String metric, double value) { - return String.format("pulsar_function_%s" - + "{tenant=\"%s\",namespace=\"%s\",functionname=\"%s\"}" - + " %s %d", - metric, functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), value, NOW); - } - - private Function.FunctionDetails createFunctionDetails(String tenant, String namespace, String name) { - Function.FunctionDetails.Builder bldr = Function.FunctionDetails.newBuilder(); - bldr.setTenant(tenant); - bldr.setNamespace(namespace); - bldr.setName(name); - return bldr.build(); - } -} diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml index a27c21b387..6de88a8399 100644 --- a/pulsar-functions/pom.xml +++ b/pulsar-functions/pom.xml @@ -37,7 +37,6 @@ api-java java-examples utils - metrics instance runtime runtime-shaded diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index 6625539d90..d5f5b79ec6 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -49,7 +49,7 @@ message FunctionStatus { // expressed in ms since epoch int64 lastInvocationTime = 13; string instanceId = 14; - MetricsData metrics = 15 [deprecated=true]; +// MetricsData metrics = 15 [deprecated=true]; // owner of function-instance string workerId = 16; } @@ -60,13 +60,37 @@ message FunctionStatusList { } message MetricsData { - message DataDigest { - double count = 1; - double sum = 2; - double max = 3; - double min = 4; - } - map metrics = 1; +// message DataDigest { +// double count = 1; +// double sum = 2; +// double max = 3; +// double min = 4; +// } +// map metrics = 1 [deprecated=true]; + + // Total number of records function received from source + int64 receivedTotal = 2; + + // Total number of records processed + int64 processedTotal = 3; + + // Total number of records successfully processed by user function + int64 processedSuccessfullyTotal = 4; + + // Total number of system exceptions thrown + int64 systemExceptionsTotal = 5; + + // Total number of user exceptions thrown + int64 userExceptionsTotal = 6; + + // Average process latency for function + double avgProcessLatency = 7; + + // Timestamp of when the function was last invoked + int64 lastInvocation = 8; + + // User defined metrics + map userMetrics = 9; } message HealthCheckResult { diff --git a/pulsar-functions/runtime-shaded/pom.xml b/pulsar-functions/runtime-shaded/pom.xml index 3adc304117..b725bb2e11 100644 --- a/pulsar-functions/runtime-shaded/pom.xml +++ b/pulsar-functions/runtime-shaded/pom.xml @@ -133,7 +133,6 @@ org.apache.pulsar:pulsar-functions-proto org.apache.pulsar:pulsar-functions-utils - org.apache.pulsar:pulsar-functions-metrics org.apache.pulsar:pulsar-functions-instance org.apache.pulsar:pulsar-functions-runtime org.apache.pulsar:pulsar-functions-api diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml index e0b264d78c..f58999c4b2 100644 --- a/pulsar-functions/runtime/pom.xml +++ b/pulsar-functions/runtime/pom.xml @@ -39,12 +39,6 @@ ${project.version} - - org.apache.pulsar - pulsar-functions-metrics - ${project.version} - - org.apache.pulsar pulsar-functions-secrets diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 4dc97ace14..482943be2c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.protobuf.Empty; -import com.google.protobuf.util.JsonFormat; import com.squareup.okhttp.Response; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -59,7 +58,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.metrics.PrometheusMetricsServer; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -135,7 +133,6 @@ InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, - String prometheusMetricsServerJarFile, String logDirectory, String userCodePkgUrl, String originalCodeFileName, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 3e06c0367c..bfa0eb38d1 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -76,7 +76,6 @@ private final String javaInstanceJarFile; private final String pythonInstanceFile; private final String extraDependenciesDir; - private final String prometheusMetricsServerJarFile; private final SecretsProviderConfigurator secretsProviderConfigurator; private final String logDirectory = "logs/functions"; private Timer changeConfigMapTimer; @@ -143,7 +142,6 @@ public KubernetesRuntimeFactory(String k8Uri, this.authConfig = authConfig; this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar"; this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py"; - this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar"; this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval; this.secretsProviderConfigurator = secretsProviderConfigurator; } @@ -182,7 +180,6 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c instanceConfig, instanceFile, extraDependenciesDir, - prometheusMetricsServerJarFile, logDirectory, codePkgUrl, originalCodeFileName, diff --git a/pulsar-functions/worker-shaded/pom.xml b/pulsar-functions/worker-shaded/pom.xml index 8c73a930e5..eedb3a6590 100644 --- a/pulsar-functions/worker-shaded/pom.xml +++ b/pulsar-functions/worker-shaded/pom.xml @@ -121,7 +121,6 @@ org.apache.pulsar:pulsar-functions-proto org.apache.pulsar:pulsar-functions-utils - org.apache.pulsar:pulsar-functions-metrics org.apache.pulsar:pulsar-functions-instance org.apache.pulsar:pulsar-functions-runtime org.apache.pulsar:pulsar-functions-worker diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 8a8fafa1a5..add4013003 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -36,7 +36,6 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; -import io.prometheus.client.CollectorRegistry; import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; @@ -46,6 +45,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -55,10 +55,10 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; /** @@ -301,89 +301,6 @@ public synchronized void removeAssignments(Collection assignments) { } } - /** - * Get status of a function instance. If this worker is not running the function instance, - * @param tenant the tenant the function belongs to - * @param namespace the namespace the function belongs to - * @param functionName the function name - * @param instanceId the function instance id - * @return the function status - */ - public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, - String functionName, int instanceId, URI uri) { - Assignment assignment; - if (runtimeFactory.externallyManaged()) { - assignment = this.findAssignment(tenant, namespace, functionName, -1); - } else { - assignment = this.findAssignment(tenant, namespace, functionName, instanceId); - } - final String assignedWorkerId = assignment.getWorkerId(); - final String workerId = this.workerConfig.getWorkerId(); - - if (assignment == null) { - InstanceCommunication.FunctionStatus.Builder functionStatusBuilder - = InstanceCommunication.FunctionStatus.newBuilder(); - functionStatusBuilder.setRunning(false); - functionStatusBuilder.setFailureException("Function has not been scheduled"); - return functionStatusBuilder.build(); - } - - InstanceCommunication.FunctionStatus functionStatus = null; - // If I am running worker - if (assignedWorkerId.equals(workerId)) { - FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( - Utils.getFullyQualifiedInstanceId(assignment.getInstance())); - RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); - if (runtimeSpawner != null) { - try { - InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus - .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get()); - functionStatusBuilder.setWorkerId(assignedWorkerId); - functionStatus = functionStatusBuilder.build(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } else { - InstanceCommunication.FunctionStatus.Builder functionStatusBuilder - = InstanceCommunication.FunctionStatus.newBuilder(); - functionStatusBuilder.setRunning(false); - functionStatusBuilder.setInstanceId(String.valueOf(instanceId)); - if (functionRuntimeInfo.getStartupException() != null) { - functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage()); - } - functionStatusBuilder.setWorkerId(assignedWorkerId); - functionStatus = functionStatusBuilder.build(); - } - } else { - // query other worker - - List workerInfoList = this.membershipManager.getCurrentMembership(); - WorkerInfo workerInfo = null; - for (WorkerInfo entry: workerInfoList) { - if (assignment.getWorkerId().equals(entry.getWorkerId())) { - workerInfo = entry; - } - } - if (workerInfo == null) { - InstanceCommunication.FunctionStatus.Builder functionStatusBuilder - = InstanceCommunication.FunctionStatus.newBuilder(); - functionStatusBuilder.setRunning(false); - functionStatusBuilder.setInstanceId(String.valueOf(instanceId)); - functionStatusBuilder.setFailureException("Function has not been scheduled"); - return functionStatusBuilder.build(); - } - - if (uri == null) { - throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build()); - } else { - URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); - throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); - } - } - - return functionStatus; - } - public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId, boolean restart, URI uri) throws Exception { if (runtimeFactory.externallyManaged()) { @@ -537,6 +454,223 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro } } + /** + * Get stats of a function instance. If this worker is not running the function instance, + * @param tenant the tenant the function belongs to + * @param namespace the namespace the function belongs to + * @param functionName the function name + * @param instanceId the function instance id + * @return jsonObject containing stats for instance + */ + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(String tenant, String namespace, + String functionName, int instanceId, URI uri) { + Assignment assignment; + if (runtimeFactory.externallyManaged()) { + assignment = this.findAssignment(tenant, namespace, functionName, -1); + } else { + assignment = this.findAssignment(tenant, namespace, functionName, instanceId); + } + + if (assignment == null) { + return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData(); + } + + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + + // If I am running worker + if (assignedWorkerId.equals(workerId)) { + FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( + Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); + if (runtimeSpawner != null) { + return Utils.getFunctionInstanceStats(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics(); + } + return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData(); + } else { + // query other worker + + List workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry: workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData(); + } + + if (uri == null) { + throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build()); + } else { + URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } + } + } + + /** + * Get stats of all function instances. + * @param tenant the tenant the function belongs to + * @param namespace the namespace the function belongs to + * @param functionName the function name + * @return a list of function statuses + * @throws PulsarAdminException + */ + public FunctionStats getFunctionStats(String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException { + Collection assignments = this.findFunctionAssignments(tenant, namespace, functionName); + + FunctionStats functionStats = new FunctionStats(); + if (assignments.isEmpty()) { + return functionStats; + } + + if (runtimeFactory.externallyManaged()) { + Assignment assignment = assignments.iterator().next(); + boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()); + if (isOwner) { + int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism(); + for (int i = 0; i < parallelism; ++i) { + + FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace, + functionName, i, null); + + FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats(); + functionInstanceStats.setInstanceId(i); + functionInstanceStats.setMetrics(functionInstanceStatsData); + functionStats.addInstance(functionInstanceStats); + } + } else { + // find the hostname/port of the worker who is the owner + + List workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry: workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + return functionStats; + } + + if (uri == null) { + throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build()); + } else { + URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } + } + } else { + for (Assignment assignment : assignments) { + boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()); + + FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData; + if (isOwner) { + functionInstanceStatsData = getFunctionInstanceStats(tenant, namespace, functionName, + assignment.getInstance().getInstanceId(), null); + } else { + functionInstanceStatsData = this.functionAdmin.functions().getFunctionStats( + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), + assignment.getInstance().getInstanceId()); + } + + FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats(); + functionInstanceStats.setInstanceId(assignment.getInstance().getInstanceId()); + functionInstanceStats.setMetrics(functionInstanceStatsData); + functionStats.addInstance(functionInstanceStats); + } + } + return functionStats.calculateOverall(); + } + + /** + * Get status of a function instance. If this worker is not running the function instance, + * @param tenant the tenant the function belongs to + * @param namespace the namespace the function belongs to + * @param functionName the function name + * @param instanceId the function instance id + * @return the function status + */ + public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, + String functionName, int instanceId, URI uri) { + Assignment assignment; + if (runtimeFactory.externallyManaged()) { + assignment = this.findAssignment(tenant, namespace, functionName, -1); + } else { + assignment = this.findAssignment(tenant, namespace, functionName, instanceId); + } + + if (assignment == null) { + InstanceCommunication.FunctionStatus.Builder functionStatusBuilder + = InstanceCommunication.FunctionStatus.newBuilder(); + functionStatusBuilder.setRunning(false); + functionStatusBuilder.setFailureException("Function has not been scheduled"); + return functionStatusBuilder.build(); + } + + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + + InstanceCommunication.FunctionStatus functionStatus = null; + // If I am running worker + if (assignedWorkerId.equals(workerId)) { + FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( + Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); + if (runtimeSpawner != null) { + try { + InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus + .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get()); + functionStatusBuilder.setWorkerId(assignedWorkerId); + functionStatus = functionStatusBuilder.build(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } else { + InstanceCommunication.FunctionStatus.Builder functionStatusBuilder + = InstanceCommunication.FunctionStatus.newBuilder(); + functionStatusBuilder.setRunning(false); + functionStatusBuilder.setInstanceId(String.valueOf(instanceId)); + if (functionRuntimeInfo.getStartupException() != null) { + functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage()); + } + functionStatusBuilder.setWorkerId(assignedWorkerId); + functionStatus = functionStatusBuilder.build(); + } + } else { + // query other worker + + List workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry: workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + InstanceCommunication.FunctionStatus.Builder functionStatusBuilder + = InstanceCommunication.FunctionStatus.newBuilder(); + functionStatusBuilder.setRunning(false); + functionStatusBuilder.setInstanceId(String.valueOf(instanceId)); + functionStatusBuilder.setFailureException("Function has not been scheduled"); + return functionStatusBuilder.build(); + } + + if (uri == null) { + throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build()); + } else { + URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } + } + + return functionStatus; + } + /** * Get statuses of all function instances. * @param tenant the tenant the function belongs to @@ -545,8 +679,9 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro * @return a list of function statuses * @throws PulsarAdminException */ - public InstanceCommunication.FunctionStatusList getAllFunctionStatus( - String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException { + public InstanceCommunication.FunctionStatusList getAllFunctionStatus(String tenant, String namespace, + String functionName, URI uri) + throws PulsarAdminException { Collection assignments = this.findFunctionAssignments(tenant, namespace, functionName); @@ -555,6 +690,7 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro return functionStatusListBuilder.build(); } + // TODO refactor the code for externally managed. if (runtimeFactory.externallyManaged()) { Assignment assignment = assignments.iterator().next(); boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index d2f1504510..77228ac224 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -18,17 +18,16 @@ */ package org.apache.pulsar.functions.worker; +import org.apache.pulsar.functions.instance.FunctionStatsManager; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; /** @@ -59,30 +58,30 @@ public static void generate(WorkerService workerService, String cluster, SimpleT if (functionRuntime != null) { try { InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get(); - for (Map.Entry metricsEntry - : metrics.getMetricsMap().entrySet()) { - String metricName = metricsEntry.getKey(); - InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue(); - String tenant = functionRuntimeInfo.getFunctionInstance() - .getFunctionMetaData().getFunctionDetails().getTenant(); - String namespace = functionRuntimeInfo.getFunctionInstance() - .getFunctionMetaData().getFunctionDetails().getNamespace(); - String name = functionRuntimeInfo.getFunctionInstance() - .getFunctionMetaData().getFunctionDetails().getName(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - String qualifiedNamespace = String.format("%s/%s", tenant, namespace); + String tenant = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails().getTenant(); + String namespace = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails().getNamespace(); + String name = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails().getName(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); + String qualifiedNamespace = String.format("%s/%s", tenant, namespace); - metric(out, cluster, qualifiedNamespace, name, String.format("%s_count", metricName), - instanceId, dataDigest.getCount()); - metric(out, cluster, qualifiedNamespace, name, String.format("%s_max", metricName), - instanceId, dataDigest.getMax()); - metric(out, cluster, qualifiedNamespace,name, String.format("%s_min", metricName), - instanceId, dataDigest.getMin()); - metric(out, cluster, qualifiedNamespace, name, String.format("%s_sum", metricName), - instanceId, dataDigest.getSum()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, metrics.getAvgProcessLatency()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, metrics.getProcessedSuccessfullyTotal()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_TOTAL, instanceId, metrics.getProcessedTotal()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, metrics.getSystemExceptionsTotal()); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, metrics.getUserExceptionsTotal()); + for (Map.Entry userMetricsMapEntry : metrics.getUserMetricsMap().entrySet()) { + String userMetricName = userMetricsMapEntry.getKey(); + Double val = userMetricsMapEntry.getValue(); + metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, instanceId, val); } + } catch (InterruptedException | ExecutionException e) { log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 41f04af445..2cd51b84ea 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -23,7 +23,12 @@ import java.net.URL; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -37,6 +42,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.policies.data.FunctionStats; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; @@ -194,5 +204,53 @@ public static String getFullyQualifiedInstanceId(String tenant, String namespace String functionName, int instanceId) { return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId); } - + + public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) { + RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); + + FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats(); + if (functionRuntimeSpawner != null) { + Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); + if (functionRuntime != null) { + try { + + InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); + functionInstanceStats.setInstanceId(instanceId); + + FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData + = new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData(); + + functionInstanceStatsData.setReceivedTotal(metricsData.getReceivedTotal()); + functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal()); + functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal()); + functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal()); + functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency()); + functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation()); + + // Filter out values that are NaN + Map statsDataMap = metricsData.getUserMetricsMap().entrySet().stream() + .filter(stringDoubleEntry -> !stringDoubleEntry.getValue().isNaN()) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); + + functionInstanceStatsData.setUserMetrics(statsDataMap); + + functionInstanceStats.setMetrics(functionInstanceStatsData); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); + } + } + } + return functionInstanceStats; + } + + public static FunctionStats getFunctionStats(Map functionRuntimes) { + FunctionStats functionStats = new FunctionStats(); + for (Map.Entry entry : functionRuntimes.entrySet()) { + String fullyQualifiedInstanceName = entry.getKey(); + FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); + functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo)); + } + return functionStats; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index af8c050b2a..18de1eb734 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -26,6 +26,7 @@ import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -84,6 +85,7 @@ import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -521,8 +523,18 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin return Response.status(Status.OK).entity(retval).build(); } - public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName, - final String componentType, final String instanceId, URI uri) throws IOException { + public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, URI uri) { + return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri); + } + + public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, URI uri) { + return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri); + } + + public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, boolean restart, URI uri) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -532,58 +544,49 @@ public Response getFunctionInstanceStatus(final String tenant, final String name try { validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); } catch (IllegalArgumentException e) { - log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { - log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); + log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); if (!calculateSubjectType(functionMetaData).equals(componentType)) { log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } - int instanceIdInt = Integer.parseInt(instanceId); - if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) { - log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName); - return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Invalid InstanceId"))).build(); - } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); - FunctionStatus functionStatus = null; try { - functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName, - Integer.parseInt(instanceId), uri); + return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName, + Integer.parseInt(instanceId), restart, uri); } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e); + log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } - - String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatus); - return Response.status(Status.OK).entity(jsonResponse).build(); } - public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, - final String componentType, final String instanceId, URI uri) { - return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri); + public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, + final String componentType) { + return stopFunctionInstances(tenant, namespace, componentName, componentType, false); } - public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName, - final String componentType, final String instanceId, URI uri) { - return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri); + public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName, + final String componentType) { + return stopFunctionInstances(tenant, namespace, componentName, componentType, true); } - public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, - final String componentType, final String instanceId, boolean restart, URI uri) { + public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, + final String componentType, boolean restart) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -591,7 +594,7 @@ public Response stopFunctionInstance(final String tenant, final String namespace // validate parameters try { - validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); } catch (IllegalArgumentException e) { log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) @@ -600,7 +603,7 @@ public Response stopFunctionInstance(final String tenant, final String namespace FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { - log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); + log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } @@ -614,28 +617,17 @@ public Response stopFunctionInstance(final String tenant, final String namespace FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName, - Integer.parseInt(instanceId), restart, uri); + return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart); } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e); + log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } } - public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, - final String componentType) { - return stopFunctionInstances(tenant, namespace, componentName, componentType, false); - } - - public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName, - final String componentType) { - return stopFunctionInstances(tenant, namespace, componentName, componentType, true); - } - - public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, - final String componentType, boolean restart) { + public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, URI uri) throws IOException { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -643,36 +635,46 @@ public Response stopFunctionInstances(final String tenant, final String namespac // validate parameters try { - validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); + validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); } catch (IllegalArgumentException e) { - log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { - log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); + log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } - FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); if (!calculateSubjectType(functionMetaData).equals(componentType)) { log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } + int instanceIdInt = Integer.parseInt(instanceId); + if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) { + log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Invalid InstanceId"))).build(); + } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + FunctionStatus functionStatus = null; try { - return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart); + functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName, + Integer.parseInt(instanceId), uri); } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e); - return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); + log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build(); } + + String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatus); + return Response.status(Status.OK).entity(jsonResponse).build(); } public Response getFunctionStatus(final String tenant, final String namespace, final String componentName, @@ -713,18 +715,109 @@ public Response getFunctionStatus(final String tenant, final String namespace, f } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("Got Exception Getting Status", e); - FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); - functionStatusBuilder.setRunning(false); - String functionDetailsJson = org.apache.pulsar.functions.utils.Utils - .printJson(functionStatusBuilder.build()); - return Response.status(Status.OK).entity(functionDetailsJson).build(); + log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build(); } String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList); return Response.status(Status.OK).entity(jsonResponse).build(); } + private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + public Response getFunctionStats(final String tenant, final String namespace, final String componentName, + final String componentType, URI uri) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + // validate parameters + try { + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); + } catch (IllegalArgumentException e) { + log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + FunctionStats functionStats; + try { + functionStats = functionRuntimeManager.getFunctionStats(tenant, namespace, componentName, uri); + } catch (WebApplicationException we) { + throw we; + } catch (Exception e) { + log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build(); + } + + return Response.status(Status.OK).entity(gson.toJson(functionStats)).build(); + + } + + public Response getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName, + final String componentType, String instanceId, URI uri) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + // validate parameters + try { + validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); + } catch (IllegalArgumentException e) { + log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); + } + int instanceIdInt = Integer.parseInt(instanceId); + if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) { + log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Invalid InstanceId"))).build(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData; + try { + functionInstanceStatsData = functionRuntimeManager.getFunctionInstanceStats(tenant, namespace, componentName, + Integer.parseInt(instanceId), uri); + } catch (WebApplicationException we) { + throw we; + } catch (Exception e) { + log.error("{}/{}/{} Got Exception Getting Stats", tenant, namespace, componentName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(new ErrorData(e.getMessage())).build(); + } + + return Response.status(Status.OK).entity(gson.toJson(functionInstanceStatsData)).build(); + } + public Response listFunctions(final String tenant, final String namespace, String componentType) { if (!isWorkerServiceAvailable()) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index 88c0a17f0a..bafb7a9475 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -19,15 +19,14 @@ package org.apache.pulsar.functions.worker.rest.api; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics; -import org.apache.pulsar.functions.runtime.Runtime; -import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.worker.*; import javax.ws.rs.WebApplicationException; @@ -36,7 +35,6 @@ import javax.ws.rs.core.Response.Status; import java.io.*; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkNotNull; @@ -46,6 +44,8 @@ private final Supplier workerServiceSupplier; + private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + public WorkerImpl(Supplier workerServiceSupplier) { this.workerServiceSupplier = workerServiceSupplier; } @@ -122,16 +122,16 @@ public boolean isSuperUser(String clientRole) { return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); } - public List getWorkerMetrcis(String clientRole) throws IOException { + public List getWorkerMetrics(String clientRole) throws IOException { if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(clientRole + " is not authorize to get metrics")).build()); } - return getWorkerMetrcis(); + return getWorkerMetrics(); } - private List getWorkerMetrcis() { + private List getWorkerMetrics() { if (!isWorkerServiceAvailable()) { throw new WebApplicationException( Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) @@ -149,6 +149,13 @@ public Response getFunctionsMetrics(String clientRole) throws IOException { return getFunctionsMetrics(); } + @Data + public static class WorkerFunctionInstanceStats { + /** fully qualified function instance name **/ + public String name; + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics; + } + private Response getFunctionsMetrics() throws IOException { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -158,41 +165,22 @@ private Response getFunctionsMetrics() throws IOException { Map functionRuntimes = workerService.getFunctionRuntimeManager() .getFunctionRuntimeInfos(); - Metrics.Builder metricsBuilder = Metrics.newBuilder(); + JsonArray metricsMapList = new JsonArray(); + for (Map.Entry entry : functionRuntimes.entrySet()) { String fullyQualifiedInstanceName = entry.getKey(); FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); - RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); - - if (functionRuntimeSpawner != null) { - Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); - if (functionRuntime != null) { - try { - InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get(); - - String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .getFunctionDetails().getTenant(); - String namespace = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .getFunctionDetails().getNamespace(); - String name = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .getFunctionDetails().getName(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - String qualifiedFunctionName = String.format("%s/%s/%s", tenant, namespace, name); - - InstanceMetrics.Builder instanceBuilder = InstanceMetrics.newBuilder(); - instanceBuilder.setName(qualifiedFunctionName); - instanceBuilder.setInstanceId(instanceId); - if (metricsData != null) { - instanceBuilder.setMetricsData(metricsData); - } - metricsBuilder.addMetrics(instanceBuilder.build()); - } catch (InterruptedException | ExecutionException e) { - log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); - } - } - } + + FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo); + + WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats(); + workerFunctionInstanceStats.setName(fullyQualifiedInstanceName); + workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics()); + + metricsMapList.add(gson.toJsonTree(workerFunctionInstanceStats)); } - String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder); + String jsonResponse = gson.toJson(metricsMapList); + return Response.status(Status.OK).entity(jsonResponse).build(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index e37a88a38a..3412d2e96a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -19,6 +19,8 @@ package org.apache.pulsar.functions.worker.rest.api.v2; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.policies.data.FunctionStats; +import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -120,12 +122,37 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, } @GET - @Path("/{tenant}/{namespace}") - public Response listFunctions(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace) { - return functions.listFunctions( - tenant, namespace, FunctionsImpl.FUNCTION); + @ApiOperation( + value = "Displays the stats of a Pulsar Function", + response = FunctionStats.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{functionName}/stats") + public Response getFunctionStats(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) throws IOException { + return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); + } + @GET + @ApiOperation( + value = "Displays the stats of a Pulsar Function instance", + response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats") + public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) throws IOException { + return functions.getFunctionsInstanceStats( + tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); } @POST diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java index c802da7c6f..022ebd3126 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java @@ -80,12 +80,12 @@ public String clientAppId() { @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) public Collection getMetrics() throws Exception { - return worker.getWorkerMetrcis(clientAppId()); + return worker.getWorkerMetrics(clientAppId()); } @GET @Path("/functionsmetrics") - @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) + @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = WorkerImpl.WorkerFunctionInstanceStats.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 503, message = "Worker service is not running") }) public Response getFunctionsMetrics() throws IOException { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 490be77732..20e4897559 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -28,9 +28,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.functions.metrics.MetricsSink; import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -41,7 +39,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import static org.mockito.Matchers.any; @@ -59,29 +56,6 @@ @Slf4j public class FunctionRuntimeManagerTest { - public static class TestSink implements MetricsSink { - - @Override - public void init(Map conf) { - - } - - @Override - public void processRecord(InstanceCommunication.MetricsData record, Function.FunctionDetails functionDetails) { - - } - - @Override - public void flush() { - - } - - @Override - public void close() { - - } - } - @Test public void testProcessAssignmentUpdateAddFunctions() throws Exception { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index c957de7e4b..21a2852d99 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -86,13 +86,13 @@ public void testFunctionsStatsGenerate() { CompletableFuture metricsDataCompletableFuture = new CompletableFuture<>(); InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder() - .putMetrics( - "pulsar_function_processed_total", - InstanceCommunication.MetricsData.DataDigest.newBuilder() - .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build()) - .putMetrics("pulsar_function_process_latency_ms", - InstanceCommunication.MetricsData.DataDigest.newBuilder() - .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build()) + .setReceivedTotal(101) + .setProcessedTotal(100) + .setProcessedSuccessfullyTotal(99) + .setAvgProcessLatency(10.0) + .setUserExceptionsTotal(3) + .setSystemExceptionsTotal(1) + .setLastInvocation(1542324900) .build(); metricsDataCompletableFuture.complete(metricsData); @@ -121,67 +121,61 @@ public void testFunctionsStatsGenerate() { FunctionsStatsGenerator.generate(workerService, "default", statsOut); String str = buf.toString(Charset.defaultCharset()); + buf.release(); Map metrics = parseMetrics(str); - Assert.assertEquals(metrics.size(), 8); + Assert.assertEquals(metrics.size(), 7); System.out.println("metrics: " + metrics); - Metric m = metrics.get("pulsar_function_processed_total_count"); - assertEquals(m.tags.get("cluster"), "default"); - assertEquals(m.tags.get("instanceId"), "0"); - assertEquals(m.tags.get("name"), "func-1"); - assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 100.0); - - m = metrics.get("pulsar_function_processed_total_max"); + Metric m = metrics.get("pulsar_function_received_total"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 200.0); + assertEquals(m.value, 101.0); - m = metrics.get("pulsar_function_processed_total_sum"); + m = metrics.get("pulsar_function_processed_total"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 300.0); + assertEquals(m.value, 100.0); - m = metrics.get("pulsar_function_processed_total_min"); + m = metrics.get("pulsar_function_user_exceptions_total"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 0.0); + assertEquals(m.value, 3.0); - m = metrics.get("pulsar_function_process_latency_ms_count"); + m = metrics.get("pulsar_function_process_latency_ms"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); assertEquals(m.value, 10.0); - m = metrics.get("pulsar_function_process_latency_ms_max"); + m = metrics.get("pulsar_function_system_exceptions_total"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 20.0); + assertEquals(m.value, 1.0); - m = metrics.get("pulsar_function_process_latency_ms_sum"); + m = metrics.get("pulsar_function_last_invocation"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 30.0); + assertEquals(m.value, 1542324900.0); - m = metrics.get("pulsar_function_process_latency_ms_min"); + m = metrics.get("pulsar_function_processed_successfully_total"); assertEquals(m.tags.get("cluster"), "default"); assertEquals(m.tags.get("instanceId"), "0"); assertEquals(m.tags.get("name"), "func-1"); assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); - assertEquals(m.value, 0.0); + assertEquals(m.value, 99.0); } /** diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml index 4f49eef66e..70bc75764c 100644 --- a/pulsar-spark/pom.xml +++ b/pulsar-spark/pom.xml @@ -34,34 +34,6 @@ Spark Streaming Pulsar Receivers - - ${project.groupId} - pulsar-broker - ${project.version} - test - - - - ${project.groupId} - pulsar-broker - ${project.version} - test - test-jar - - - - org.mockito - mockito-core - test - - - - ${project.groupId} - managed-ledger-original - ${project.version} - test - test-jar - ${project.groupId} @@ -97,7 +69,6 @@ true - ture com.google.guava:guava diff --git a/tests/pom.xml b/tests/pom.xml index 6c4b094d4c..e6f6ba7e67 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -36,6 +36,7 @@ integration pulsar-kafka-compat-client-test pulsar-storm-test + pulsar-spark-test diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml new file mode 100644 index 0000000000..fae0bda937 --- /dev/null +++ b/tests/pulsar-spark-test/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + + + org.apache.pulsar.tests + tests-parent + 2.3.0-SNAPSHOT + + + pulsar-spark-test + jar + Spark Streaming Pulsar Receivers Tests + + + + + org.apache.pulsar + pulsar-spark + ${project.version} + + + org.apache.pulsar + pulsar-client + + + + + + org.apache.pulsar + pulsar-broker + ${project.version} + + + + org.apache.pulsar + pulsar-broker + ${project.version} + test-jar + + + + org.apache.pulsar + managed-ledger-original + ${project.version} + test-jar + + + + org.mockito + mockito-core + + + + org.apache.spark + spark-streaming_2.10 + + + + diff --git a/pulsar-spark/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java similarity index 100% rename from pulsar-spark/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java rename to tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java diff --git a/pulsar-spark/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java similarity index 100% rename from pulsar-spark/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java rename to tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services