pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #2213: Add authorization support on function apis
Date Tue, 24 Jul 2018 03:18:27 GMT
rdhabalia closed pull request #2213: Add authorization support on function apis
URL: https://github.com/apache/incubator-pulsar/pull/2213
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 58bcf1dfd3..4194337e4a 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
 metricsSamplingPeriodSec: 60
+# Enforce authentication
+authenticationEnabled: false
+# Enforce authorization on accessing functions api
+authorizationEnabled: false
+# Set of autentication provider name list, which is a list of class names
+authenticationProviders: 
+# Set of role names that are treated as "super-user", meaning they will be able to access
any admin-api
+superUserRoles: 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 6338ce98f2..f97f1807c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -82,8 +82,8 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("url") String functionPkgUrl,
                                      final @FormDataParam("functionDetails") String functionDetailsJson)
{
 
-        return functions.registerFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl,
functionDetailsJson);
+        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream,
fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
     }
 
     @PUT
@@ -103,8 +103,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("url") String functionPkgUrl,
                                    final @FormDataParam("functionDetails") String functionDetailsJson)
{
 
-        return functions.updateFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl,
functionDetailsJson);
+        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream,
fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
 
     }
 
@@ -122,8 +122,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("functionName") String functionName)
{
-        return functions.deregisterFunction(
-            tenant, namespace, functionName);
+        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index a7b34ae857..16f1a76597 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -77,6 +77,7 @@
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
@@ -99,7 +100,6 @@
     PulsarAdmin admin;
     PulsarClient pulsarClient;
     BrokerStats brokerStatsClient;
-    WorkerServer functionsWorkerServer;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
@@ -120,6 +120,11 @@
 
     private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class);
 
+    @DataProvider(name = "validRoleName")
+    public Object[][] validRoleName() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @BeforeMethod
     void setup(Method method) throws Exception {
 
@@ -187,6 +192,7 @@ void setup(Method method) throws Exception {
         pulsarClient = clientBuilder.build();
        
         TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
         propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
        
@@ -231,6 +237,9 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration
config) {
         workerConfig.setUseTls(true);
         workerConfig.setTlsAllowInsecureConnection(true);
         workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+        
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
 
         return new WorkerService(workerConfig);
     }
@@ -416,4 +425,34 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String
 
         return functionDetailsBuilder.build();
     }
+    
+    @Test(dataProvider = "validRoleName")
+    public void testAuthorization(boolean validRoleName) throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        String roleName = validRoleName ? "superUser" : "invalid";
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add(roleName);
+        propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion,
functionName,
+                sinkTopic, subscriptionName);
+        try {
+            admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+            Assert.assertTrue(validRoleName);
+        } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException
ne) {
+            Assert.assertFalse(validRoleName);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 9b10cbe55a..029f573d76 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -34,6 +34,12 @@
 
   <dependencies>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-functions-runtime</artifactId>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 09f54ef424..eda9b1515f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,14 +20,18 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.Sets;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -42,7 +46,7 @@
 @EqualsAndHashCode
 @ToString
 @Accessors(chain = true)
-public class WorkerConfig implements Serializable {
+public class WorkerConfig implements Serializable, PulsarConfiguration {
 
     private static final long serialVersionUID = 1L;
 
@@ -74,6 +78,17 @@
     private boolean tlsAllowInsecureConnection = false;
     private boolean tlsHostnameVerificationEnable = false;
     private int metricsSamplingPeriodSec = 60;
+    // Enforce authentication
+    private boolean authenticationEnabled = false;
+    // Autentication provider name list, which is a list of class names
+    private Set<String> authenticationProviders = Sets.newTreeSet();
+    // Enforce authorization on accessing functions admin-api
+    private boolean authorizationEnabled = false;
+    // Role names that are treated as "super-user", meaning they will be able to access any
admin-api
+    private Set<String> superUserRoles = Sets.newTreeSet();
+    
+    private Properties properties = new Properties();
+
 
     @Data
     @Setter
@@ -135,4 +150,9 @@ public static String unsafeLocalhostResolve() {
             throw new IllegalStateException("Failed to resolve localhost name.", ex);
         }
     }
+  
+    @Override
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 29dff537f1..6af9c8f2bf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -35,9 +35,12 @@
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -57,8 +60,9 @@
     private SchedulerManager schedulerManager;
     private boolean isInitialized = false;
     private final ScheduledExecutorService statsUpdater;
-
+    private AuthenticationService authenticationService;
     private ConnectorsManager connectorsManager;
+    private PulsarAdmin admin;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
@@ -68,6 +72,11 @@ public WorkerService(WorkerConfig workerConfig) {
 
     public void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
+        
+        this.admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+                workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
+                workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
+        
         try {
             log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
                     .writeValueAsString(workerConfig));
@@ -128,6 +137,8 @@ public void start(URI dlogUri) throws InterruptedException {
 
             // initialize function metadata manager
             this.functionMetaDataManager.initialize();
+            
+            authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
 
             // Starting cluster services
             log.info("Start cluster services...");
@@ -200,6 +211,10 @@ public void stop() {
         if (null != schedulerManager) {
             schedulerManager.close();
         }
+        
+        if (null != this.admin) {
+            this.admin.close();
+        }
     }
 
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 1c5c739982..4673d5645f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import java.util.Optional;
 import java.util.function.Supplier;
 import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Context;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 
@@ -32,6 +36,8 @@
     private WorkerService workerService;
     @Context
     protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
 
     public FunctionApiResource() {
         this.functions = new FunctionsImpl(this);
@@ -44,4 +50,10 @@ public synchronized WorkerService get() {
         }
         return this.workerService;
     }
+
+    public String clientAppId() {
+        return httpRequest != null
+                ? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+                : null;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 524a6ad295..a57a9529fa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -19,8 +19,15 @@
 package org.apache.pulsar.functions.worker.rest;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+
+import javax.servlet.DispatcherType;
+
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.eclipse.jetty.server.Handler;
@@ -28,9 +35,11 @@
 
 import java.net.BindException;
 import java.net.URI;
+
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.glassfish.jersey.server.ResourceConfig;
@@ -41,6 +50,7 @@
 
     private final WorkerConfig workerConfig;
     private final WorkerService workerService;
+    private static final String MATCH_ALL = "/*";
 
     private static String getErrorMessage(Server server, int port, Exception ex) {
         if (ex instanceof BindException) {
@@ -106,7 +116,12 @@ public static ServletContextHandler newServletContextHandler(String contextPath,
         final ServletHolder apiServlet =
                 new ServletHolder(new ServletContainer(config));
         contextHandler.addServlet(apiServlet, "/*");
+        if (workerService.getWorkerConfig().isAuthenticationEnabled()) {
+            FilterHolder filter = new FilterHolder(new AuthenticationFilter(workerService.getAuthenticationService()));
+            contextHandler.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+        }
 
         return contextHandler;
     }
+    
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 2ac48dc960..b935bf5f58 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -57,12 +57,14 @@
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -117,11 +119,24 @@ private boolean isWorkerServiceAvailable() {
 
     public Response registerFunction(final String tenant, final String namespace, final String
functionName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson) {
+            final String functionPkgUrl, final String functionDetailsJson, final String clientRole)
{
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
+        
+        try {
+            if (!isAuthorizedRole(tenant, clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register function",
tenant, namespace, functionName,
+                        clientRole);
+                return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("client is not authorize to perform operation")).build();
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName,
e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
 
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
@@ -168,12 +183,25 @@ public Response registerFunction(final String tenant, final String namespace,
fi
 
     public Response updateFunction(final String tenant, final String namespace, final String
functionName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson) {
+            final String functionPkgUrl, final String functionDetailsJson, final String clientRole)
{
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to update function",
tenant, namespace,
+                        functionName, clientRole);
+                return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("client is not authorize to perform operation")).build();
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName,
e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+        
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
         // validate parameters
@@ -217,12 +245,26 @@ public Response updateFunction(final String tenant, final String namespace,
fina
                 : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
     }
 
-    public Response deregisterFunction(final String tenant, final String namespace, final
String functionName) {
+    public Response deregisterFunction(final String tenant, final String namespace, final
String functionName,
+            String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister
function", tenant, namespace,
+                        functionName, clientRole);
+                return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("client is not authorize to perform operation")).build();
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName,
e);
+            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+        
         // validate parameters
         try {
             validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -893,5 +935,18 @@ public static String createPackagePath(String tenant, String namespace,
String f
         return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName),
                 Utils.getUniquePackageName(Codec.encode(fileName)));
     }
+    
+    private boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException
{
+        if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+            // skip authorization if client role is super-user
+            if (clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))
{
+                return true;
+            }
+            TenantInfo tenantInfo = worker().getAdmin().tenants().getTenantInfo(tenant);
+            return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()
+                    || tenantInfo.getAdminRoles().contains(clientRole));
+        }
+        return true;
+    }
 
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index a23c41bc1e..92957b32d4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -62,8 +62,8 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("url") String functionPkgUrl,
                                      final @FormDataParam("functionDetails") String functionDetailsJson)
{
 
-        return functions.registerFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl,
functionDetailsJson);
+        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream,
fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
 
     }
 
@@ -78,8 +78,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("url") String functionPkgUrl,
                                    final @FormDataParam("functionDetails") String functionDetailsJson)
{
 
-        return functions.updateFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl,
functionDetailsJson);
+        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream,
fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
 
     }
 
@@ -87,10 +87,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
     @DELETE
     @Path("/{tenant}/{namespace}/{functionName}")
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
-                                       final @PathParam("namespace") String namespace,
-                                       final @PathParam("functionName") String functionName)
{
-        return functions.deregisterFunction(
-            tenant, namespace, functionName);
+            final @PathParam("namespace") String namespace, final @PathParam("functionName")
String functionName) {
+        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 866b92e8ed..c59d03dde4 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -314,7 +314,8 @@ private void testRegisterFunctionMissingArguments(
                 inputStream,
                 details,
                 null,
-                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+                null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         if (missingFieldName.equals("parallelism")) {
@@ -342,7 +343,8 @@ private Response registerDefaultFunction() throws IOException {
             mockedInputStream,
             mockedFormData,
             null,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
     }
 
     @Test
@@ -587,7 +589,8 @@ private void testUpdateFunctionMissingArguments(
             inputStream,
             details,
             null,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         if (missingFieldName.equals("parallelism")) {
@@ -615,7 +618,8 @@ private Response updateDefaultFunction() throws IOException {
             mockedInputStream,
             mockedFormData,
             null,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
     }
 
     @Test
@@ -696,7 +700,8 @@ public void testUpdateFunctionWithUrl() throws IOException {
             null,
             null,
             filePackageUrl,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
@@ -783,7 +788,8 @@ private void testDeregisterFunctionMissingArguments(
         Response response = resource.deregisterFunction(
             tenant,
             namespace,
-            function);
+            function,
+            null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData)
response.getEntity()).reason);
@@ -793,7 +799,8 @@ private Response deregisterDefaultFunction() {
         return resource.deregisterFunction(
             tenant,
             namespace,
-            function);
+            function,
+            null);
     }
 
     @Test
@@ -1043,7 +1050,7 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException
{
                         .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
                 .build();
         Response response = resource.registerFunction(tenant, namespace, function, null,
null, filePackageUrl,
-                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
@@ -1068,7 +1075,7 @@ public void testRegisterFunctionFileUrlWithInValidSinkClass() throws
IOException
                         .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
                 .build();
         Response response = resource.registerFunction(tenant, namespace, function, null,
null, filePackageUrl,
-                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message