pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [pulsar] branch master updated: Fix Spellings and Code Cleanup (#3181)
Date Thu, 13 Dec 2018 14:15:39 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6df595a  Fix Spellings and Code Cleanup (#3181)
6df595a is described below

commit 6df595a341a884404242b8d6dcf0d686e71907ee
Author: Ali Ahmed <alahmed.se@gmail.com>
AuthorDate: Thu Dec 13 06:15:34 2018 -0800

    Fix Spellings and Code Cleanup (#3181)
    
    * Fix Spellings and Code Cleanup
    
    * Fix Code comments
    
    * Remove Tenant check
    
    * Fix test cases
---
 .../pulsar/broker/admin/impl/BrokerStatsBase.java  |   2 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      |   2 +-
 .../pulsar/broker/admin/impl/ClustersBase.java     |   4 +-
 .../broker/admin/impl/ResourceQuotasBase.java      |   4 +-
 .../apache/pulsar/broker/admin/v2/BrokerStats.java |   2 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |   2 +-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |   2 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   4 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 246 ++++++++++++++-------
 .../functions/worker/rest/api/FunctionsImpl.java   |  22 +-
 .../worker/rest/api/FunctionsMetricsResource.java  |   3 +-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |  33 ++-
 .../functions/worker/rest/api/SourceImpl.java      |  28 ++-
 .../functions/worker/rest/api/WorkerImpl.java      |  25 ++-
 14 files changed, 246 insertions(+), 133 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
index 03fc9a2..43b9ba4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
@@ -87,7 +87,7 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/destinations")
-    @ApiOperation(value = "Get all the topic stats by namesapce", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
+    @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
                                                                                                                                            // map
                                                                                                                                            // support
                                                                                                                                            // missing
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index c4d504c..900ad7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -117,7 +117,7 @@ public class BrokersBase extends AdminResource {
             @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
             @ApiResponse(code = 404, message = "Configuration not found"),
             @ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
-    public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
+    public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception {
         validateSuperUserAccess();
         updateDynamicConfigurationOnZk(configName, configValue);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 3fcb7d6..b65d268 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -344,7 +344,7 @@ public class ClustersBase extends AdminResource {
                     .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
-            // construct the response to NamespaceisolationData map
+            // construct the response to Namespace isolation data map
             return nsIsolationPolicies.getPolicies();
         } catch (Exception e) {
             log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
@@ -368,7 +368,7 @@ public class ClustersBase extends AdminResource {
                     .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
-            // construct the response to NamespaceisolationData map
+            // construct the response to Namespace isolation data map
             if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
                 log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", policyName, cluster);
                 throw new RestException(Status.NOT_FOUND,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index 2c664f0..3f502da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -94,7 +94,7 @@ public abstract class ResourceQuotasBase extends NamespacesBase {
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to set resource quota for namespace bundle {}: concurrent modification",
                     clientAppId(), nsBundle.toString());
-            throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota");
+            throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota");
         } catch (Exception e) {
             log.error("[{}] Failed to set resource quota for namespace bundle {}", clientAppId(), nsBundle.toString());
             throw new RestException(e);
@@ -123,7 +123,7 @@ public abstract class ResourceQuotasBase extends NamespacesBase {
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to unset resource quota for namespace bundle {}: concurrent modification",
                     clientAppId(), nsBundle.toString());
-            throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota");
+            throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota");
         } catch (Exception e) {
             log.error("[{}] Failed to unset resource quota for namespace bundle {}", clientAppId(),
                     nsBundle.toString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
index 5fd5783..a04fded 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
@@ -43,7 +43,7 @@ public class BrokerStats extends BrokerStatsBase {
     @GET
     @Path("/topics")
     @ApiOperation(
-            value = "Get all the topic stats by namesapce",
+            value = "Get all the topic stats by namespace",
             response = OutputStream.class,
             responseContainer = "OutputStream")
     // https://github.com/swagger-api/swagger-ui/issues/558
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 47ce6e4..5d1b9c0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -86,7 +86,7 @@ public class CmdSinks extends CmdBase {
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("list", listSinks);
         jcommander.addCommand("get", getSink);
-        // TODO depecreate getstatus
+        // TODO deprecate getstatus
         jcommander.addCommand("status", getSinkStatus, "getstatus");
         jcommander.addCommand("stop", stopSink);
         jcommander.addCommand("restart", restartSink);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 9ec950a..7acd85b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -382,7 +382,7 @@ public class CmdSources extends CmdBase {
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
             if (isBlank(sourceConfig.getArchive())) {
-                throw new ParameterException("Source archive not specfied");
+                throw new ParameterException("Source archive not specified");
             }
             org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
             if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) &&
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 2eb951a..bf9c079 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -178,7 +178,7 @@ public class TestCmdSources {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified")
     public void testMissingArchive() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setArchive(null);
@@ -356,7 +356,7 @@ public class TestCmdSources {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified")
     public void testCmdSourceConfigFileMissingJar() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setArchive(null);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index dd2dce2..fa8f783 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -153,12 +153,16 @@ public abstract class ComponentImpl {
 
         public abstract T notScheduledInstance();
 
-        public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId);
+        public abstract T fromFunctionStatusProto(final InstanceCommunication.FunctionStatus status,
+                                                  final String assignedWorkerId);
 
-        public abstract T notRunning(String assignedWorkerId, String error);
+        public abstract T notRunning(final String assignedWorkerId, final String error);
 
-        public T getComponentInstanceStatus(String tenant, String namespace,
-                                            String name, int instanceId, URI uri) {
+        public T getComponentInstanceStatus(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int instanceId,
+                                            final URI uri) {
 
             Function.Assignment assignment;
             if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
@@ -218,16 +222,23 @@ public abstract class ComponentImpl {
             }
         }
 
-        public abstract S getStatus(String tenant, String namespace,
-                                    String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException;
+        public abstract S getStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final Collection<Function.Assignment> assignments,
+                                    final URI uri) throws PulsarAdminException;
 
-        public abstract S getStatusExternal(String tenant, String namespace,
-                                            String name, int parallelism);
+        public abstract S getStatusExternal(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int parallelism);
 
-        public abstract S emptyStatus(int parallelism);
+        public abstract S emptyStatus(final int parallelism);
 
-        public S getComponentStatus(String tenant, String namespace,
-                                    String name, URI uri) {
+        public S getComponentStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final URI uri) {
 
             Function.FunctionMetaData functionMetaData = worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name);
 
@@ -291,10 +302,14 @@ public abstract class ComponentImpl {
         return true;
     }
 
-    public Response registerFunction(final String tenant, final String namespace, final String componentName,
-                                     final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-                                     final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
-
+    public Response registerFunction(final String tenant,
+                                     final String namespace,
+                                     final String componentName,
+                                     final InputStream uploadedInputStream,
+                                     final FormDataContentDisposition fileDetail,
+                                     final String functionPkgUrl,
+                                     final String functionDetailsJson,
+                                     final String componentConfigJson,
                                      final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -315,9 +330,11 @@ public abstract class ComponentImpl {
         }
 
         try {
-            TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-            String qualifedNamespace = tenant + "/" + namespace;
-            if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace)) {
+            // Check tenant exists
+            final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace)) {
                 log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace,
                         componentName, namespace);
                 return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -409,10 +426,10 @@ public abstract class ComponentImpl {
         return updateRequest(functionMetaDataBuilder.build());
     }
 
-    private PackageLocationMetaData.Builder getFunctionPackageLocation(FunctionDetails functionDetails,
-                                                                       String functionPkgUrl,
+    private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionDetails functionDetails,
+                                                                       final String functionPkgUrl,
                                                                        final FormDataContentDisposition fileDetail,
-                                                                       File uploadedInputStreamAsFile) throws Exception {
+                                                                       final File uploadedInputStreamAsFile) throws Exception {
         String tenant = functionDetails.getTenant();
         String namespace = functionDetails.getNamespace();
         String componentName = functionDetails.getName();
@@ -466,9 +483,14 @@ public abstract class ComponentImpl {
     }
 
 
-    public Response updateFunction(final String tenant, final String namespace, final String componentName,
-                                   final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-                                   final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+    public Response updateFunction(final String tenant,
+                                   final String namespace,
+                                   final String componentName,
+                                   final InputStream uploadedInputStream,
+                                   final FormDataContentDisposition fileDetail,
+                                   final String functionPkgUrl,
+                                   final String functionDetailsJson,
+                                   final String componentConfigJson,
                                    final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -516,7 +538,7 @@ public abstract class ComponentImpl {
             FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             functionConfig.setTenant(tenant);
             functionConfig.setNamespace(namespace);
             functionConfig.setName(componentName);
@@ -531,7 +553,7 @@ public abstract class ComponentImpl {
             SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             sourceConfig.setTenant(tenant);
             sourceConfig.setNamespace(namespace);
             sourceConfig.setName(componentName);
@@ -546,7 +568,7 @@ public abstract class ComponentImpl {
             SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingSinkConfig);
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             sinkConfig.setTenant(tenant);
             sinkConfig.setNamespace(namespace);
             sinkConfig.setName(componentName);
@@ -570,7 +592,6 @@ public abstract class ComponentImpl {
         if (uploadedInputStream != null) {
             uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
         }
-        File existingPackageAsFile = null;
 
         // validate parameters
         try {
@@ -581,7 +602,8 @@ public abstract class ComponentImpl {
                 functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
                         fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType);
             } else {
-                functionDetails = validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
+                functionDetails = validateUpdateRequestParamsWithExistingMetadata(
+                        tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
             }
         } catch (Exception e) {
             log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
@@ -618,8 +640,10 @@ public abstract class ComponentImpl {
         return updateRequest(functionMetaDataBuilder.build());
     }
 
-    public Response deregisterFunction(final String tenant, final String namespace, final String componentName,
-                                       String clientRole) {
+    public Response deregisterFunction(final String tenant,
+                                       final String namespace,
+                                       final String componentName,
+                                       final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -702,8 +726,9 @@ public abstract class ComponentImpl {
         return Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
-    public Response getFunctionInfo(final String tenant, final String namespace, final String componentName)
-            throws IOException {
+    public Response getFunctionInfo(final String tenant,
+                                    final String namespace,
+                                    final String componentName) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -731,32 +756,42 @@ public abstract class ComponentImpl {
                     .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
-        String retval;
+        String retVal;
         if (componentType.equals(FUNCTION)) {
             FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         } else if (componentType.equals(SOURCE)) {
             SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         } else {
             SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retval = new Gson().toJson(config);
+            retVal = new Gson().toJson(config);
         }
-        return Response.status(Status.OK).entity(retval).build();
+        return Response.status(Status.OK).entity(retVal).build();
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
-                                         final String instanceId, URI uri) {
+    public Response stopFunctionInstance(final String tenant,
+                                         final String namespace,
+                                         final String componentName,
+                                         final String instanceId,
+                                         final URI uri) {
         return stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri);
     }
 
-    public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
-                                            final String instanceId, URI uri) {
+    public Response restartFunctionInstance(final String tenant,
+                                            final String namespace,
+                                            final String componentName,
+                                            final String instanceId,
+                                            final URI uri) {
         return stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri);
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
-                                         final String instanceId, boolean restart, URI uri) {
+    public Response stopFunctionInstance(final String tenant,
+                                         final String namespace,
+                                         final String componentName,
+                                         final String instanceId,
+                                         final boolean restart,
+                                         final URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -797,16 +832,22 @@ public abstract class ComponentImpl {
         }
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName) {
+    public Response stopFunctionInstances(final String tenant,
+                                          final String namespace,
+                                          final String componentName) {
         return stopFunctionInstances(tenant, namespace, componentName, false);
     }
 
-    public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName) {
+    public Response restartFunctionInstances(final String tenant,
+                                             final String namespace,
+                                             final String componentName) {
         return stopFunctionInstances(tenant, namespace, componentName, true);
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
-                                          boolean restart) {
+    public Response stopFunctionInstances(final String tenant,
+                                          final String namespace,
+                                          final String componentName,
+                                          final boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -846,8 +887,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    public FunctionStats getFunctionStats(final String tenant, final String namespace, final String componentName,
-                                          URI uri) throws IOException {
+    public FunctionStats getFunctionStats(final String tenant,
+                                          final String namespace,
+                                          final String componentName,
+                                          final URI uri) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
         }
@@ -887,8 +930,11 @@ public abstract class ComponentImpl {
 
     }
 
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName,
-                                                                                                   String instanceId, URI uri) {
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant,
+                                                                                                   final String namespace,
+                                                                                                   final String componentName,
+                                                                                                   final String instanceId,
+                                                                                                   final URI uri) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
         }
@@ -997,14 +1043,17 @@ public abstract class ComponentImpl {
         return this.worker().getConnectorsManager().getConnectors();
     }
 
-    public Response triggerFunction(final String tenant, final String namespace, final String functionName,
-                                    final String input, final InputStream uploadedInputStream, final String topic) {
+    public Response triggerFunction(final String tenant,
+                                    final String namespace,
+                                    final String functionName,
+                                    final String input,
+                                    final InputStream uploadedInputStream,
+                                    final String topic) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
-        FunctionDetails functionDetails;
         // validate parameters
         try {
             validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
@@ -1098,8 +1147,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    public Response getFunctionState(final String tenant, final String namespace,
-                                     final String functionName, final String key) {
+    public Response getFunctionState(final String tenant,
+                                     final String namespace,
+                                     final String functionName,
+                                     final String key) {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
@@ -1209,7 +1260,7 @@ public abstract class ComponentImpl {
         }).build();
     }
 
-    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
+    private void validateListFunctionRequestParams(final String tenant, final String namespace) throws IllegalArgumentException {
 
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
@@ -1219,8 +1270,11 @@ public abstract class ComponentImpl {
         }
     }
 
-    protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName,
-                                                          ComponentType componentType, String instanceId) throws IllegalArgumentException {
+    protected void validateGetFunctionInstanceRequestParams(final String tenant,
+                                                            final String namespace,
+                                                            final String componentName,
+                                                            final ComponentType componentType,
+                                                            final String instanceId) throws IllegalArgumentException {
         validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         if (instanceId == null) {
             throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
@@ -1255,10 +1309,14 @@ public abstract class ComponentImpl {
         }
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName,
-                                                                  String functionPkgUrl, String functionDetailsJson, String componentConfigJson,
-                                                                  ComponentType componentType)
-            throws IllegalArgumentException, IOException, URISyntaxException {
+    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(final String tenant,
+                                                                  final String namespace,
+                                                                  final String componentName,
+                                                                  final String functionPkgUrl,
+                                                                  final String functionDetailsJson,
+                                                                  final String componentConfigJson,
+                                                                  final ComponentType componentType)
+            throws IllegalArgumentException, IOException {
         if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
         }
@@ -1267,9 +1325,14 @@ public abstract class ComponentImpl {
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
-                                                        File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
-                                                        String componentConfigJson, ComponentType componentType)
+    private FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                        final String namespace,
+                                                        final String componentName,
+                                                        final File uploadedInputStreamAsFile,
+                                                        final FormDataContentDisposition fileDetail,
+                                                        final String functionDetailsJson,
+                                                        final String componentConfigJson,
+                                                        final ComponentType componentType)
             throws IllegalArgumentException, IOException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
@@ -1281,9 +1344,12 @@ public abstract class ComponentImpl {
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName,
-                                                                            PackageLocationMetaData packageLocationMetaData,
-                                                                            String componentConfigJson, ComponentType componentType) throws Exception {
+    private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(final String tenant,
+                                                                            final String namespace,
+                                                                            final String componentName,
+                                                                            final PackageLocationMetaData packageLocationMetaData,
+                                                                            final String componentConfigJson,
+                                                                            final ComponentType componentType) throws Exception {
         File tmpFile = File.createTempFile("functions", null);
         tmpFile.deleteOnExit();
         Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
@@ -1291,7 +1357,7 @@ public abstract class ComponentImpl {
                 null, componentConfigJson, componentType, null, tmpFile);
     }
 
-    private static File dumpToTmpFile(InputStream uploadedInputStream) {
+    private static File dumpToTmpFile(final InputStream uploadedInputStream) {
         try {
             File tmpFile = File.createTempFile("functions", null);
             tmpFile.deleteOnExit();
@@ -1302,7 +1368,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
+    private void validateGetFunctionStateParams(final String tenant,
+                                                final String namespace,
+                                                final String functionName,
+                                                final String key)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1355,9 +1424,14 @@ public abstract class ComponentImpl {
         return null;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
-                                                        String functionDetailsJson, String componentConfigJson, ComponentType componentType,
-                                                        String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException {
+    private FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                        final String namespace,
+                                                        final String componentName,
+                                                        final String functionDetailsJson,
+                                                        final String componentConfigJson,
+                                                        final ComponentType componentType,
+                                                        final String functionPkgUrl,
+                                                        final File uploadedInputStreamAsFile) throws IOException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1370,7 +1444,7 @@ public abstract class ComponentImpl {
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precendence over whatever is there in functionconfig
+            // The rest end points take precedence over whatever is there in functionconfig
             functionConfig.setTenant(tenant);
             functionConfig.setNamespace(namespace);
             functionConfig.setName(componentName);
@@ -1381,7 +1455,7 @@ public abstract class ComponentImpl {
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precendence over whatever is there in sourceconfig
+            // The rest end points take precedence over whatever is there in sourceconfig
             sourceConfig.setTenant(tenant);
             sourceConfig.setNamespace(namespace);
             sourceConfig.setName(componentName);
@@ -1403,7 +1477,7 @@ public abstract class ComponentImpl {
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            // The rest end points take precendence over whatever is there in sinkConfig
+            // The rest end points take precedence over whatever is there in sinkConfig
             sinkConfig.setTenant(tenant);
             sinkConfig.setNamespace(namespace);
             sinkConfig.setName(componentName);
@@ -1562,8 +1636,14 @@ public abstract class ComponentImpl {
         return TypeResolver.resolveRawArgument(funClass, loadedClass);
     }
 
-    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic,
-                                              String input, InputStream uploadedInputStream) {
+    private void validateTriggerRequestParams(final String tenant,
+                                              final String namespace,
+                                              final String functionName,
+                                              final String topic,
+                                              final String input,
+                                              final InputStream uploadedInputStream) {
+        // Note : Checking topic is not required it can be null
+
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1660,8 +1740,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    protected void componentInstanceStatusRequestValidate (final String tenant, final String namespace,
-                                                           final String componentName, int instanceId) {
+    protected void componentInstanceStatusRequestValidate (final String tenant,
+                                                           final String namespace,
+                                                           final String componentName,
+                                                           final int instanceId) {
         componentStatusRequestValidate(tenant, namespace, componentName);
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 9a6f739..f3e41a1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -155,7 +153,10 @@ public class FunctionsImpl extends ComponentImpl {
         }
 
         @Override
-        public FunctionStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
+        public FunctionStatus getStatusExternal(final String tenant,
+                                                final String namespace,
+                                                final String name,
+                                                final int parallelism) {
             FunctionStatus functionStatus = new FunctionStatus();
             for (int i = 0; i < parallelism; ++i) {
                 FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
@@ -177,7 +178,7 @@ public class FunctionsImpl extends ComponentImpl {
         }
 
         @Override
-        public FunctionStatus emptyStatus(int parallelism) {
+        public FunctionStatus emptyStatus(final int parallelism) {
             FunctionStatus functionStatus = new FunctionStatus();
             functionStatus.setNumInstances(parallelism);
             functionStatus.setNumRunning(0);
@@ -209,8 +210,11 @@ public class FunctionsImpl extends ComponentImpl {
      * @param instanceId the function instance id
      * @return the function status
      */
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
-                                                                                                      final String instanceId, URI uri) throws IOException {
+    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant,
+                                                                                                      final String namespace,
+                                                                                                      final String componentName,
+                                                                                                      final String instanceId,
+                                                                                                      final URI uri) {
 
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId));
@@ -237,8 +241,10 @@ public class FunctionsImpl extends ComponentImpl {
      * @return a list of function statuses
      * @throws PulsarAdminException
      */
-    public FunctionStatus getFunctionStatus(final String tenant, final String namespace, final String componentName,
-                                            URI uri) {
+    public FunctionStatus getFunctionStatus(final String tenant,
+                                            final String namespace,
+                                            final String componentName,
+                                            final URI uri) {
 
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
index b94dfd8..9d7b316 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -38,7 +37,7 @@ public class FunctionsMetricsResource extends FunctionApiResource {
     @Path("metrics")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response getMetrics() throws JsonProcessingException {
+    public Response getMetrics() {
 
         WorkerService workerService = get();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 6d70fbb..862aaf6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -123,13 +121,18 @@ public class SinkImpl extends ComponentImpl {
         }
 
         @Override
-        public SinkStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
+        public SinkStatus getStatus(final String tenant,
+                                    final String namespace,
+                                    final String name,
+                                    final Collection<Function.Assignment> assignments,
+                                    final URI uri) throws PulsarAdminException {
             SinkStatus sinkStatus = new SinkStatus();
             for (Function.Assignment assignment : assignments) {
                 boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                 SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
                 if (isOwner) {
-                    sinkInstanceStatusData = getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null);
+                    sinkInstanceStatusData = getComponentInstanceStatus(tenant,
+                            namespace, name, assignment.getInstance().getInstanceId(), null);
                 } else {
                     sinkInstanceStatusData = worker().getFunctionAdmin().sink().getSinkStatus(
                             assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
@@ -154,7 +157,10 @@ public class SinkImpl extends ComponentImpl {
         }
 
         @Override
-        public SinkStatus getStatusExternal (String tenant, String namespace, String name, int parallelism) {
+        public SinkStatus getStatusExternal(final String tenant,
+                                            final String namespace,
+                                            final String name,
+                                            final int parallelism) {
             SinkStatus sinkStatus = new SinkStatus();
             for (int i = 0; i < parallelism; ++i) {
                 SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData
@@ -176,7 +182,7 @@ public class SinkImpl extends ComponentImpl {
         }
 
         @Override
-        public SinkStatus emptyStatus(int parallelism) {
+        public SinkStatus emptyStatus(final int parallelism) {
             SinkStatus sinkStatus = new SinkStatus();
             sinkStatus.setNumInstances(parallelism);
             sinkStatus.setNumRunning(0);
@@ -200,9 +206,11 @@ public class SinkImpl extends ComponentImpl {
         super(workerServiceSupplier, ComponentType.SINK);
     }
 
-    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(
-            String tenant, String namespace, String sinkName, String instanceId, URI uri)
-            throws IOException {
+    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
+                                                                                      final String namespace,
+                                                                                      final String sinkName,
+                                                                                      final String instanceId,
+                                                                                      final URI uri) {
 
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId));
@@ -221,9 +229,10 @@ public class SinkImpl extends ComponentImpl {
         return sinkInstanceStatusData;
     }
 
-    public SinkStatus getSinkStatus(
-            final String tenant, final String namespace,
-            final String componentName, URI uri) {
+    public SinkStatus getSinkStatus(final String tenant,
+                                    final String namespace,
+                                    final String componentName,
+                                    final URI uri) {
 
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 8c3c988..d915bb0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -122,7 +120,11 @@ public class SourceImpl extends ComponentImpl {
         }
 
         @Override
-        public SourceStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
+        public SourceStatus getStatus(final String tenant,
+                                      final String namespace,
+                                      final String name,
+                                      final Collection<Function.Assignment> assignments,
+                                      final URI uri) throws PulsarAdminException {
             SourceStatus sourceStatus = new SourceStatus();
             for (Function.Assignment assignment : assignments) {
                 boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
@@ -153,7 +155,10 @@ public class SourceImpl extends ComponentImpl {
         }
 
         @Override
-        public SourceStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
+        public SourceStatus getStatusExternal(final String tenant,
+                                              final String namespace,
+                                              final String name,
+                                              final int parallelism) {
             SourceStatus sinkStatus = new SourceStatus();
             for (int i = 0; i < parallelism; ++i) {
                 SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData
@@ -175,7 +180,7 @@ public class SourceImpl extends ComponentImpl {
         }
 
         @Override
-        public SourceStatus emptyStatus(int parallelism) {
+        public SourceStatus emptyStatus(final int parallelism) {
             SourceStatus sourceStatus = new SourceStatus();
             sourceStatus.setNumInstances(parallelism);
             sourceStatus.setNumRunning(0);
@@ -199,8 +204,10 @@ public class SourceImpl extends ComponentImpl {
         super(workerServiceSupplier, ComponentType.SOURCE);
     }
 
-    public SourceStatus getSourceStatus(final String tenant, final String namespace,
-                                        final String componentName, URI uri) throws IOException {
+    public SourceStatus getSourceStatus(final String tenant,
+                                        final String namespace,
+                                        final String componentName,
+                                        final URI uri) {
         // validate parameters
         componentStatusRequestValidate(tenant, namespace, componentName);
 
@@ -217,8 +224,11 @@ public class SourceImpl extends ComponentImpl {
         return sourceStatus;
     }
 
-    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(
-            String tenant, String namespace, String sourceName, String instanceId, URI uri) {
+    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(final String tenant,
+                                                                                              final String namespace,
+                                                                                              final String sourceName,
+                                                                                              final String instanceId,
+                                                                                              final URI uri) {
         // validate parameters
         componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId));
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index bafb7a9..56c945d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -27,14 +27,20 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.MembershipManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerService;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import java.io.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -118,11 +124,11 @@ public class WorkerImpl {
                 .build();
     }
 
-    public boolean isSuperUser(String clientRole) {
+    public boolean isSuperUser(final String clientRole) {
         return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
-    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(String clientRole) throws IOException {
+    public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final String clientRole) {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
             throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -135,12 +141,12 @@ public class WorkerImpl {
         if (!isWorkerServiceAvailable()) {
             throw new WebApplicationException(
                     Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is not avaialable")).build());
+                            .entity(new ErrorData("Function worker service is not available")).build());
         }
         return worker().getMetricsGenerator().generate();
     }
 
-    public Response getFunctionsMetrics(String clientRole) throws IOException {
+    public Response getFunctionsMetrics(final String clientRole) {
         if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
             return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -156,7 +162,7 @@ public class WorkerImpl {
         public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
     }
 
-    private Response getFunctionsMetrics() throws IOException {
+    private Response getFunctionsMetrics() {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
@@ -171,7 +177,8 @@ public class WorkerImpl {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
 
-            FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
+            FunctionStats.FunctionInstanceStats functionInstanceStats =
+                    Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
 
             WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
             workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);


Mime
View raw message