pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #3181: Fix Spellings and Code Cleanup
Date Thu, 13 Dec 2018 14:15:36 GMT
srkukarni closed pull request #3181: Fix Spellings and Code Cleanup
URL: https://github.com/apache/pulsar/pull/3181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
index 03fc9a2584..43b9ba4d29 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
@@ -87,7 +87,7 @@
 
     @GET
     @Path("/destinations")
-    @ApiOperation(value = "Get all the topic stats by namesapce", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
+    @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
                                                                                                                                            // map
                                                                                                                                            // support
                                                                                                                                            // missing
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index c4d504c06d..900ad7c23b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -117,7 +117,7 @@
             @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
             @ApiResponse(code = 404, message = "Configuration not found"),
             @ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
-    public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
+    public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception {
         validateSuperUserAccess();
         updateDynamicConfigurationOnZk(configName, configValue);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 3fcb7d6f60..b65d268815 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -344,7 +344,7 @@ private void deleteFailureDomain(String clusterPath) {
                     .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
-            // construct the response to NamespaceisolationData map
+            // construct the response to Namespace isolation data map
             return nsIsolationPolicies.getPolicies();
         } catch (Exception e) {
             log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
@@ -368,7 +368,7 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster")
                     .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
-            // construct the response to NamespaceisolationData map
+            // construct the response to Namespace isolation data map
             if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
                 log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", policyName, cluster);
                 throw new RestException(Status.NOT_FOUND,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index 2c664f0f10..3f502daaa1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -94,7 +94,7 @@ protected void internalSetNamespaceBundleResourceQuota(String bundleRange, Resou
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to set resource quota for namespace bundle {}: concurrent modification",
                     clientAppId(), nsBundle.toString());
-            throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota");
+            throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota");
         } catch (Exception e) {
             log.error("[{}] Failed to set resource quota for namespace bundle {}", clientAppId(), nsBundle.toString());
             throw new RestException(e);
@@ -123,7 +123,7 @@ protected void internalRemoveNamespaceBundleResourceQuota(String bundleRange) {
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to unset resource quota for namespace bundle {}: concurrent modification",
                     clientAppId(), nsBundle.toString());
-            throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota");
+            throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota");
         } catch (Exception e) {
             log.error("[{}] Failed to unset resource quota for namespace bundle {}", clientAppId(),
                     nsBundle.toString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
index 5fd57839f9..a04fded730 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
@@ -43,7 +43,7 @@
     @GET
     @Path("/topics")
     @ApiOperation(
-            value = "Get all the topic stats by namesapce",
+            value = "Get all the topic stats by namespace",
             response = OutputStream.class,
             responseContainer = "OutputStream")
     // https://github.com/swagger-api/swagger-ui/issues/558
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 47ce6e493f..5d1b9c02a1 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -86,7 +86,7 @@ public CmdSinks(PulsarAdmin admin) {
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("list", listSinks);
         jcommander.addCommand("get", getSink);
-        // TODO depecreate getstatus
+        // TODO deprecate getstatus
         jcommander.addCommand("status", getSinkStatus, "getstatus");
         jcommander.addCommand("stop", stopSink);
         jcommander.addCommand("restart", restartSink);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 9ec950a446..7acd85b8f0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -382,7 +382,7 @@ void processArguments() throws Exception {
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
             if (isBlank(sourceConfig.getArchive())) {
-                throw new ParameterException("Source archive not specfied");
+                throw new ParameterException("Source archive not specified");
             }
             org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
             if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) &&
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 2eb951a97f..bf9c07934e 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -178,7 +178,7 @@ public void testMissingProcessingGuarantees() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified")
     public void testMissingArchive() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setArchive(null);
@@ -356,7 +356,7 @@ public void testCmdSourceConfigFileMissingResources() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified")
     public void testCmdSourceConfigFileMissingJar() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setArchive(null);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index dd2dce2d83..fa8f7830c5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -153,12 +153,16 @@ public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, ComponentTyp
 
         public abstract T notScheduledInstance();
 
-        public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId);
+        public abstract T fromFunctionStatusProto(final InstanceCommunication.FunctionStatus status,
+                                                  final String assignedWorkerId);
 
-        public abstract T notRunning(String assignedWorkerId, String error);
+        public abstract T notRunning(final String assignedWorkerId, final String error);
 
-        public T getComponentInstanceStatus(String tenant, String namespace,
-                                            String name, int instanceId, URI uri) {
+        public T getComponentInstanceStatus(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int instanceId,
+                                            final URI uri) {
 
             Function.Assignment assignment;
             if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
@@ -218,16 +222,23 @@ public T getComponentInstanceStatus(String tenant, String namespace,
             }
         }
 
-        public abstract S getStatus(String tenant, String namespace,
-                                    String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException;
+        public abstract S getStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final Collection<Function.Assignment> assignments,
+                                    final URI uri) throws PulsarAdminException;
 
-        public abstract S getStatusExternal(String tenant, String namespace,
-                                            String name, int parallelism);
+        public abstract S getStatusExternal(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int parallelism);
 
-        public abstract S emptyStatus(int parallelism);
+        public abstract S emptyStatus(final int parallelism);
 
-        public S getComponentStatus(String tenant, String namespace,
-                                    String name, URI uri) {
+        public S getComponentStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final URI uri) {
 
             Function.FunctionMetaData functionMetaData = worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name);
 
@@ -291,10 +302,14 @@ boolean isWorkerServiceAvailable() {
         return true;
     }
 
-    public Response registerFunction(final String tenant, final String namespace, final String componentName,
-                                     final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-                                     final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
-
+    public Response registerFunction(final String tenant,
+                                     final String namespace,
+                                     final String componentName,
+                                     final InputStream uploadedInputStream,
+                                     final FormDataContentDisposition fileDetail,
+                                     final String functionPkgUrl,
+                                     final String functionDetailsJson,
+                                     final String componentConfigJson,
                                      final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -315,9 +330,11 @@ public Response registerFunction(final String tenant, final String namespace, fi
         }
 
         try {
-            TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-            String qualifedNamespace = tenant + "/" + namespace;
-            if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace)) {
+            // Check tenant exists
+            final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace)) {
                 log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace,
                         componentName, namespace);
                 return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -409,10 +426,10 @@ public Response registerFunction(final String tenant, final String namespace, fi
         return updateRequest(functionMetaDataBuilder.build());
     }
 
-    private PackageLocationMetaData.Builder getFunctionPackageLocation(FunctionDetails functionDetails,
-                                                                       String functionPkgUrl,
+    private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionDetails functionDetails,
+                                                                       final String functionPkgUrl,
                                                                        final FormDataContentDisposition fileDetail,
-                                                                       File uploadedInputStreamAsFile) throws Exception {
+                                                                       final File uploadedInputStreamAsFile) throws Exception {
         String tenant = functionDetails.getTenant();
         String namespace = functionDetails.getNamespace();
         String componentName = functionDetails.getName();
@@ -466,9 +483,14 @@ public Response registerFunction(final String tenant, final String namespace, fi
     }
 
 
-    public Response updateFunction(final String tenant, final String namespace, final String componentName,
-                                   final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-                                   final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+    public Response updateFunction(final String tenant,
+                                   final String namespace,
+                                   final String componentName,
+                                   final InputStream uploadedInputStream,
+                                   final FormDataContentDisposition fileDetail,
+                                   final String functionPkgUrl,
+                                   final String functionDetailsJson,
+                                   final String componentConfigJson,
                                    final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -516,7 +538,7 @@ public Response updateFunction(final String tenant, final String namespace, fina
             FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             functionConfig.setTenant(tenant);
             functionConfig.setNamespace(namespace);
             functionConfig.setName(componentName);
@@ -531,7 +553,7 @@ public Response updateFunction(final String tenant, final String namespace, fina
             SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             sourceConfig.setTenant(tenant);
             sourceConfig.setNamespace(namespace);
             sourceConfig.setName(componentName);
@@ -546,7 +568,7 @@ public Response updateFunction(final String tenant, final String namespace, fina
             SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingSinkConfig);
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             sinkConfig.setTenant(tenant);
             sinkConfig.setNamespace(namespace);
             sinkConfig.setName(componentName);
@@ -570,7 +592,6 @@ public Response updateFunction(final String tenant, final String namespace, fina
         if (uploadedInputStream != null) {
             uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
         }
-        File existingPackageAsFile = null;
 
         // validate parameters
         try {
@@ -581,7 +602,8 @@ public Response updateFunction(final String tenant, final String namespace, fina
                 functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
                         fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType);
             } else {
-                functionDetails = validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
+                functionDetails = validateUpdateRequestParamsWithExistingMetadata(
+                        tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
             }
         } catch (Exception e) {
             log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
@@ -618,8 +640,10 @@ public Response updateFunction(final String tenant, final String namespace, fina
         return updateRequest(functionMetaDataBuilder.build());
     }
 
-    public Response deregisterFunction(final String tenant, final String namespace, final String componentName,
-                                       String clientRole) {
+    public Response deregisterFunction(final String tenant,
+                                       final String namespace,
+                                       final String componentName,
+                                       final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -702,8 +726,9 @@ public Response deregisterFunction(final String tenant, final String namespace,
         return Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
-    public Response getFunctionInfo(final String tenant, final String namespace, final String componentName)
-            throws IOException {
+    public Response getFunctionInfo(final String tenant,
+                                    final String namespace,
+                                    final String componentName) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -731,32 +756,42 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin
                     .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
-        String retval;
+        String retVal;
         if (componentType.equals(FUNCTION)) {
             FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         } else if (componentType.equals(SOURCE)) {
             SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         } else {
             SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         }
-        return Response.status(Status.OK).entity(retval).build();
+        return Response.status(Status.OK).entity(retVal).build();
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
-                                         final String instanceId, URI uri) {
+    public Response stopFunctionInstance(final String tenant,
+                                         final String namespace,
+                                         final String componentName,
+                                         final String instanceId,
+                                         final URI uri) {
         return stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri);
     }
 
-    public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
-                                            final String instanceId, URI uri) {
+    public Response restartFunctionInstance(final String tenant,
+                                            final String namespace,
+                                            final String componentName,
+                                            final String instanceId,
+                                            final URI uri) {
         return stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri);
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
-                                         final String instanceId, boolean restart, URI uri) {
+    public Response stopFunctionInstance(final String tenant,
+                                         final String namespace,
+                                         final String componentName,
+                                         final String instanceId,
+                                         final boolean restart,
+                                         final URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -797,16 +832,22 @@ public Response stopFunctionInstance(final String tenant, final String namespace
         }
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName) {
+    public Response stopFunctionInstances(final String tenant,
+                                          final String namespace,
+                                          final String componentName) {
         return stopFunctionInstances(tenant, namespace, componentName, false);
     }
 
-    public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName) {
+    public Response restartFunctionInstances(final String tenant,
+                                             final String namespace,
+                                             final String componentName) {
         return stopFunctionInstances(tenant, namespace, componentName, true);
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
-                                          boolean restart) {
+    public Response stopFunctionInstances(final String tenant,
+                                          final String namespace,
+                                          final String componentName,
+                                          final boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -846,8 +887,10 @@ public Response stopFunctionInstances(final String tenant, final String namespac
         }
     }
 
-    public FunctionStats getFunctionStats(final String tenant, final String namespace, final String componentName,
-                                          URI uri) throws IOException {
+    public FunctionStats getFunctionStats(final String tenant,
+                                          final String namespace,
+                                          final String componentName,
+                                          final URI uri) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
         }
@@ -887,8 +930,11 @@ public FunctionStats getFunctionStats(final String tenant, final String namespac
 
     }
 
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName,
-                                                                                                   String instanceId, URI uri) {
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant,
+                                                                                                   final String namespace,
+                                                                                                   final String componentName,
+                                                                                                   final String instanceId,
+                                                                                                   final URI uri) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
         }
@@ -997,14 +1043,17 @@ private Response updateRequest(FunctionMetaData functionMetaData) {
         return this.worker().getConnectorsManager().getConnectors();
     }
 
-    public Response triggerFunction(final String tenant, final String namespace, final String functionName,
-                                    final String input, final InputStream uploadedInputStream, final String topic) {
+    public Response triggerFunction(final String tenant,
+                                    final String namespace,
+                                    final String functionName,
+                                    final String input,
+                                    final InputStream uploadedInputStream,
+                                    final String topic) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
-        FunctionDetails functionDetails;
         // validate parameters
         try {
             validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
@@ -1098,8 +1147,10 @@ public Response triggerFunction(final String tenant, final String namespace, fin
         }
     }
 
-    public Response getFunctionState(final String tenant, final String namespace,
-                                     final String functionName, final String key) {
+    public Response getFunctionState(final String tenant,
+                                     final String namespace,
+                                     final String functionName,
+                                     final String key) {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
@@ -1209,7 +1260,7 @@ public void write(final OutputStream output) throws IOException {
         }).build();
     }
 
-    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
+    private void validateListFunctionRequestParams(final String tenant, final String namespace) throws IllegalArgumentException {
 
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
@@ -1219,8 +1270,11 @@ private void validateListFunctionRequestParams(String tenant, String namespace)
         }
     }
 
-    protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName,
-                                                          ComponentType componentType, String instanceId) throws IllegalArgumentException {
+    protected void validateGetFunctionInstanceRequestParams(final String tenant,
+                                                            final String namespace,
+                                                            final String componentName,
+                                                            final ComponentType componentType,
+                                                            final String instanceId) throws IllegalArgumentException {
         validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         if (instanceId == null) {
             throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
@@ -1255,10 +1309,14 @@ private void validateDeregisterRequestParams(String tenant, String namespace, St
         }
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName,
-                                                                  String functionPkgUrl, String functionDetailsJson, String componentConfigJson,
-                                                                  ComponentType componentType)
-            throws IllegalArgumentException, IOException, URISyntaxException {
+    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(final String tenant,
+                                                                  final String namespace,
+                                                                  final String componentName,
+                                                                  final String functionPkgUrl,
+                                                                  final String functionDetailsJson,
+                                                                  final String componentConfigJson,
+                                                                  final ComponentType componentType)
+            throws IllegalArgumentException, IOException {
         if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
         }
@@ -1267,9 +1325,14 @@ private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, Str
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
-                                                        File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
-                                                        String componentConfigJson, ComponentType componentType)
+    private FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                        final String namespace,
+                                                        final String componentName,
+                                                        final File uploadedInputStreamAsFile,
+                                                        final FormDataContentDisposition fileDetail,
+                                                        final String functionDetailsJson,
+                                                        final String componentConfigJson,
+                                                        final ComponentType componentType)
             throws IllegalArgumentException, IOException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
@@ -1281,9 +1344,12 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName,
-                                                                            PackageLocationMetaData packageLocationMetaData,
-                                                                            String componentConfigJson, ComponentType componentType) throws Exception {
+    private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(final String tenant,
+                                                                            final String namespace,
+                                                                            final String componentName,
+                                                                            final PackageLocationMetaData packageLocationMetaData,
+                                                                            final String componentConfigJson,
+                                                                            final ComponentType componentType) throws Exception {
         File tmpFile = File.createTempFile("functions", null);
         tmpFile.deleteOnExit();
         Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
@@ -1291,7 +1357,7 @@ private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String t
                 null, componentConfigJson, componentType, null, tmpFile);
     }
 
-    private static File dumpToTmpFile(InputStream uploadedInputStream) {
+    private static File dumpToTmpFile(final InputStream uploadedInputStream) {
         try {
             File tmpFile = File.createTempFile("functions", null);
             tmpFile.deleteOnExit();
@@ -1302,7 +1368,10 @@ private static File dumpToTmpFile(InputStream uploadedInputStream) {
         }
     }
 
-    private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
+    private void validateGetFunctionStateParams(final String tenant,
+                                                final String namespace,
+                                                final String functionName,
+                                                final String key)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1355,9 +1424,14 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
         return null;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
-                                                        String functionDetailsJson, String componentConfigJson, ComponentType componentType,
-                                                        String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException {
+    private FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                        final String namespace,
+                                                        final String componentName,
+                                                        final String functionDetailsJson,
+                                                        final String componentConfigJson,
+                                                        final ComponentType componentType,
+                                                        final String functionPkgUrl,
+                                                        final File uploadedInputStreamAsFile) throws IOException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1370,7 +1444,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             functionConfig.setTenant(tenant);
             functionConfig.setNamespace(namespace);
             functionConfig.setName(componentName);
@@ -1381,7 +1455,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precendence over whatever is there in sourceconfig
+            // The rest end points take precedence over whatever is there in sourceconfig
             sourceConfig.setTenant(tenant);
             sourceConfig.setNamespace(namespace);
             sourceConfig.setName(componentName);
@@ -1403,7 +1477,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            // The rest end points take precendence over whatever is there in sinkConfig
+            // The rest end points take precedence over whatever is there in sinkConfig
             sinkConfig.setTenant(tenant);
             sinkConfig.setNamespace(namespace);
             sinkConfig.setName(componentName);
@@ -1562,8 +1636,14 @@ private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails
         return TypeResolver.resolveRawArgument(funClass, loadedClass);
     }
 
-    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic,
-                                              String input, InputStream uploadedInputStream) {
+    private void validateTriggerRequestParams(final String tenant,
+                                              final String namespace,
+                                              final String functionName,
+                                              final String topic,
+                                              final String input,
+                                              final InputStream uploadedInputStream) {
+        // Note : Checking topic is not required it can be null
+
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1660,8 +1740,10 @@ protected void componentStatusRequestValidate (final String tenant, final String
         }
     }
 
-    protected void componentInstanceStatusRequestValidate (final String tenant, final String namespace,
-                                                           final String componentName, int instanceId) {
+    protected void componentInstanceStatusRequestValidate (final String tenant,
+                                                           final String namespace,
+                                                           final String componentName,
+                                                           final int instanceId) {
         componentStatusRequestValidate(tenant, namespace, componentName);
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 9a6f739ec8..f3e41a1d54 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,13 +24,11 @@
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -155,7 +153,10 @@ public FunctionStatus getStatus(String tenant, String namespace, String name, Co
         }
 
         @Override
-        public FunctionStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
+        public FunctionStatus getStatusExternal(final String tenant,
+                                                final String namespace,
+                                                final String name,
+                                                final int parallelism) {
             FunctionStatus functionStatus = new FunctionStatus();
             for (int i = 0; i < parallelism; ++i) {
                 FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
@@ -177,7 +178,7 @@ public FunctionStatus getStatusExternal(String tenant, String namespace, String
         }
 
         @Override
-        public FunctionStatus emptyStatus(int parallelism) {
+        public FunctionStatus emptyStatus(final int parallelism) {
             FunctionStatus functionStatus = new FunctionStatus();
             functionStatus.setNumInstances(parallelism);
             functionStatus.setNumRunning(0);
@@ -209,8 +210,11 @@ public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
      * @param instanceId the function instance id
      * @return the function status
      */
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
-                                                                                                      final String instanceId, URI uri) throws IOException {
+    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant,
+                                                                                                      final String namespace,
+                                                                                                      final String componentName,
+                                                                                                      final String instanceId,
+                                                                                                      final URI uri) {
 
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId));
@@ -237,8 +241,10 @@ public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
      * @return a list of function statuses
      * @throws PulsarAdminException
      */
-    public FunctionStatus getFunctionStatus(final String tenant, final String namespace, final String componentName,
-                                            URI uri) {
+    public FunctionStatus getFunctionStatus(final String tenant,
+                                            final String namespace,
+                                            final String componentName,
+                                            final URI uri) {
 
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
index b94dfd8f51..9d7b316553 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -38,7 +37,7 @@
     @Path("metrics")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response getMetrics() throws JsonProcessingException {
+    public Response getMetrics() {
 
         WorkerService workerService = get();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 6d70fbba38..862aaf6483 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -24,13 +24,11 @@
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -123,13 +121,18 @@
         }
 
         @Override
-        public SinkStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
+        public SinkStatus getStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final Collection<Function.Assignment> assignments,
+                                    final URI uri) throws PulsarAdminException {
             SinkStatus sinkStatus = new SinkStatus();
             for (Function.Assignment assignment : assignments) {
                 boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                 SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
                 if (isOwner) {
-                    sinkInstanceStatusData = getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null);
+                    sinkInstanceStatusData = getComponentInstanceStatus(tenant,
+                            namespace, name, assignment.getInstance().getInstanceId(), null);
                 } else {
                     sinkInstanceStatusData = worker().getFunctionAdmin().sink().getSinkStatus(
                             assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
@@ -154,7 +157,10 @@ public SinkStatus getStatus(String tenant, String namespace, String name, Collec
         }
 
         @Override
-        public SinkStatus getStatusExternal (String tenant, String namespace, String name, int parallelism) {
+        public SinkStatus getStatusExternal(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int parallelism) {
             SinkStatus sinkStatus = new SinkStatus();
             for (int i = 0; i < parallelism; ++i) {
                 SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData
@@ -176,7 +182,7 @@ public SinkStatus getStatusExternal (String tenant, String namespace, String nam
         }
 
         @Override
-        public SinkStatus emptyStatus(int parallelism) {
+        public SinkStatus emptyStatus(final int parallelism) {
             SinkStatus sinkStatus = new SinkStatus();
             sinkStatus.setNumInstances(parallelism);
             sinkStatus.setNumRunning(0);
@@ -200,9 +206,11 @@ public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, ComponentType.SINK);
     }
 
-    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(
-            String tenant, String namespace, String sinkName, String instanceId, URI uri)
-            throws IOException {
+    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
+                                                                                      final String namespace,
+                                                                                      final String sinkName,
+                                                                                      final String instanceId,
+                                                                                      final URI uri) {
 
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId));
@@ -221,9 +229,10 @@ public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
         return sinkInstanceStatusData;
     }
 
-    public SinkStatus getSinkStatus(
-            final String tenant, final String namespace,
-            final String componentName, URI uri) {
+    public SinkStatus getSinkStatus(final String tenant,
+                                    final String namespace,
+                                    final String componentName,
+                                    final URI uri) {
 
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 8c3c988e1f..d915bb0d53 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -24,13 +24,11 @@
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -122,7 +120,11 @@
         }
 
         @Override
-        public SourceStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
+        public SourceStatus getStatus(final String tenant,
+                                      final String namespace,
+                                      final String name,
+                                      final Collection<Function.Assignment> assignments,
+                                      final URI uri) throws PulsarAdminException {
             SourceStatus sourceStatus = new SourceStatus();
             for (Function.Assignment assignment : assignments) {
                 boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
@@ -153,7 +155,10 @@ public SourceStatus getStatus(String tenant, String namespace, String name, Coll
         }
 
         @Override
-        public SourceStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
+        public SourceStatus getStatusExternal(final String tenant,
+                                              final String namespace,
+                                              final String name,
+                                              final int parallelism) {
             SourceStatus sinkStatus = new SourceStatus();
             for (int i = 0; i < parallelism; ++i) {
                 SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData
@@ -175,7 +180,7 @@ public SourceStatus getStatusExternal(String tenant, String namespace, String na
         }
 
         @Override
-        public SourceStatus emptyStatus(int parallelism) {
+        public SourceStatus emptyStatus(final int parallelism) {
             SourceStatus sourceStatus = new SourceStatus();
             sourceStatus.setNumInstances(parallelism);
             sourceStatus.setNumRunning(0);
@@ -199,8 +204,10 @@ public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, ComponentType.SOURCE);
     }
 
-    public SourceStatus getSourceStatus(final String tenant, final String namespace,
-                                        final String componentName, URI uri) throws IOException {
+    public SourceStatus getSourceStatus(final String tenant,
+                                        final String namespace,
+                                        final String componentName,
+                                        final URI uri) {
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
 
@@ -217,8 +224,11 @@ public SourceStatus getSourceStatus(final String tenant, final String namespace,
         return sourceStatus;
     }
 
-    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(
-            String tenant, String namespace, String sourceName, String instanceId, URI uri) {
+    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(final String tenant,
+                                                                                              final String namespace,
+                                                                                              final String sourceName,
+                                                                                              final String instanceId,
+                                                                                              final URI uri) {
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId));
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index bafb7a9475..56c945da55 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -27,14 +27,20 @@
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.MembershipManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerService;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import java.io.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -118,11 +124,11 @@ private Response getUnavailableResponse() {
                 .build();
     }
 
-    public boolean isSuperUser(String clientRole) {
+    public boolean isSuperUser(final String clientRole) {
         return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
-    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(String clientRole) throws IOException {
+    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final String clientRole) {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
             throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -135,12 +141,12 @@ public boolean isSuperUser(String clientRole) {
         if (!isWorkerServiceAvailable()) {
             throw new WebApplicationException(
                     Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is not avaialable")).build());
+                            .entity(new ErrorData("Function worker service is not available")).build());
         }
         return worker().getMetricsGenerator().generate();
     }
 
-    public Response getFunctionsMetrics(String clientRole) throws IOException {
+    public Response getFunctionsMetrics(final String clientRole) {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
             return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -156,7 +162,7 @@ public Response getFunctionsMetrics(String clientRole) throws IOException {
         public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
     }
 
-    private Response getFunctionsMetrics() throws IOException {
+    private Response getFunctionsMetrics() {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
@@ -171,7 +177,8 @@ private Response getFunctionsMetrics() throws IOException {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
 
-            FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
+            FunctionStats.FunctionInstanceStats functionInstanceStats =
+                    Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
 
             WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
             workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message