From commits-return-11641-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Tue Jul 24 05:18:30 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 141A5180647 for ; Tue, 24 Jul 2018 05:18:28 +0200 (CEST) Received: (qmail 57558 invoked by uid 500); 24 Jul 2018 03:18:27 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 57548 invoked by uid 99); 24 Jul 2018 03:18:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2018 03:18:27 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] rdhabalia closed pull request #2213: Add authorization support on function apis Message-ID: <153240230724.898.2873797667946031302.gitbox@gitbox.apache.org> Date: Tue, 24 Jul 2018 03:18:27 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit rdhabalia closed pull request #2213: Add authorization support on function apis URL: https://github.com/apache/incubator-pulsar/pull/2213 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 58bcf1dfd3..4194337e4a 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 metricsSamplingPeriodSec: 60 +# Enforce authentication +authenticationEnabled: false +# Enforce authorization on accessing functions api +authorizationEnabled: false +# Set of autentication provider name list, which is a list of class names +authenticationProviders: +# Set of role names that are treated as "super-user", meaning they will be able to access any admin-api +superUserRoles: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 6338ce98f2..f97f1807c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -82,8 +82,8 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { - return functions.registerFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); + return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, clientAppId()); } @PUT @@ -103,8 +103,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { - return functions.updateFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); + return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, clientAppId()); } @@ -122,8 +122,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, public Response deregisterFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.deregisterFunction( - tenant, namespace, functionName); + return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index a7b34ae857..16f1a76597 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -77,6 +77,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.collect.Maps; @@ -99,7 +100,6 @@ PulsarAdmin admin; PulsarClient pulsarClient; BrokerStats brokerStatsClient; - WorkerServer functionsWorkerServer; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; @@ -120,6 +120,11 @@ private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class); + @DataProvider(name = "validRoleName") + public Object[][] validRoleName() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + @BeforeMethod void setup(Method method) throws Exception { @@ -187,6 +192,7 @@ void setup(Method method) throws Exception { pulsarClient = clientBuilder.build(); TenantInfo propAdmin = new TenantInfo(); + propAdmin.getAdminRoles().add("superUser"); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); @@ -231,6 +237,9 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { workerConfig.setUseTls(true); workerConfig.setTlsAllowInsecureConnection(true); workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH); + + workerConfig.setAuthenticationEnabled(true); + workerConfig.setAuthorizationEnabled(true); return new WorkerService(workerConfig); } @@ -416,4 +425,34 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String return functionDetailsBuilder.build(); } + + @Test(dataProvider = "validRoleName") + public void testAuthorization(boolean validRoleName) throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + String roleName = validRoleName ? "superUser" : "invalid"; + TenantInfo propAdmin = new TenantInfo(); + propAdmin.getAdminRoles().add(roleName); + propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); + admin.tenants().updateTenant(tenant, propAdmin); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, + sinkTopic, subscriptionName); + try { + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + Assert.assertTrue(validRoleName); + } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { + Assert.assertFalse(validRoleName); + } + } } \ No newline at end of file diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 9b10cbe55a..029f573d76 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -34,6 +34,12 @@ + + ${project.groupId} + pulsar-broker-common + ${project.version} + + ${project.groupId} pulsar-functions-runtime diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 09f54ef424..eda9b1515f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -20,14 +20,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Properties; +import java.util.Set; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.configuration.PulsarConfiguration; import lombok.Data; import lombok.EqualsAndHashCode; @@ -42,7 +46,7 @@ @EqualsAndHashCode @ToString @Accessors(chain = true) -public class WorkerConfig implements Serializable { +public class WorkerConfig implements Serializable, PulsarConfiguration { private static final long serialVersionUID = 1L; @@ -74,6 +78,17 @@ private boolean tlsAllowInsecureConnection = false; private boolean tlsHostnameVerificationEnable = false; private int metricsSamplingPeriodSec = 60; + // Enforce authentication + private boolean authenticationEnabled = false; + // Autentication provider name list, which is a list of class names + private Set authenticationProviders = Sets.newTreeSet(); + // Enforce authorization on accessing functions admin-api + private boolean authorizationEnabled = false; + // Role names that are treated as "super-user", meaning they will be able to access any admin-api + private Set superUserRoles = Sets.newTreeSet(); + + private Properties properties = new Properties(); + @Data @Setter @@ -135,4 +150,9 @@ public static String unsafeLocalhostResolve() { throw new IllegalStateException("Failed to resolve localhost name.", ex); } } + + @Override + public void setProperties(Properties properties) { + this.properties = properties; + } } \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 29dff537f1..6af9c8f2bf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -35,9 +35,12 @@ import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; /** * A service component contains everything to run a worker except rest server. @@ -57,8 +60,9 @@ private SchedulerManager schedulerManager; private boolean isInitialized = false; private final ScheduledExecutorService statsUpdater; - + private AuthenticationService authenticationService; private ConnectorsManager connectorsManager; + private PulsarAdmin admin; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; @@ -68,6 +72,11 @@ public WorkerService(WorkerConfig workerConfig) { public void start(URI dlogUri) throws InterruptedException { log.info("Starting worker {}...", workerConfig.getWorkerId()); + + this.admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), + workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), + workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection()); + try { log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() .writeValueAsString(workerConfig)); @@ -128,6 +137,8 @@ public void start(URI dlogUri) throws InterruptedException { // initialize function metadata manager this.functionMetaDataManager.initialize(); + + authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)); // Starting cluster services log.info("Start cluster services..."); @@ -200,6 +211,10 @@ public void stop() { if (null != schedulerManager) { schedulerManager.close(); } + + if (null != this.admin) { + this.admin.close(); + } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java index 1c5c739982..4673d5645f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java @@ -18,9 +18,13 @@ */ package org.apache.pulsar.functions.worker.rest; +import java.util.Optional; import java.util.function.Supplier; import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Context; + +import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; @@ -32,6 +36,8 @@ private WorkerService workerService; @Context protected ServletContext servletContext; + @Context + protected HttpServletRequest httpRequest; public FunctionApiResource() { this.functions = new FunctionsImpl(this); @@ -44,4 +50,10 @@ public synchronized WorkerService get() { } return this.workerService; } + + public String clientAppId() { + return httpRequest != null + ? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName) + : null; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 524a6ad295..a57a9529fa 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -19,8 +19,15 @@ package org.apache.pulsar.functions.worker.rest; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; +import java.util.Map; + +import javax.servlet.DispatcherType; + import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.eclipse.jetty.server.Handler; @@ -28,9 +35,11 @@ import java.net.BindException; import java.net.URI; + import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.glassfish.jersey.server.ResourceConfig; @@ -41,6 +50,7 @@ private final WorkerConfig workerConfig; private final WorkerService workerService; + private static final String MATCH_ALL = "/*"; private static String getErrorMessage(Server server, int port, Exception ex) { if (ex instanceof BindException) { @@ -106,7 +116,12 @@ public static ServletContextHandler newServletContextHandler(String contextPath, final ServletHolder apiServlet = new ServletHolder(new ServletContainer(config)); contextHandler.addServlet(apiServlet, "/*"); + if (workerService.getWorkerConfig().isAuthenticationEnabled()) { + FilterHolder filter = new FilterHolder(new AuthenticationFilter(workerService.getAuthenticationService())); + contextHandler.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + } return contextHandler; } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 2ac48dc960..b935bf5f58 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -57,12 +57,14 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -117,11 +119,24 @@ private boolean isWorkerServiceAvailable() { public Response registerFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson) { + final String functionPkgUrl, final String functionDetailsJson, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } + + try { + if (!isAuthorizedRole(tenant, clientRole)) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace, functionName, + clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } FunctionDetails functionDetails; boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); @@ -168,12 +183,25 @@ public Response registerFunction(final String tenant, final String namespace, fi public Response updateFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson) { + final String functionPkgUrl, final String functionDetailsJson, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } + try { + if (!isAuthorizedRole(tenant, clientRole)) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", tenant, namespace, + functionName, clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + FunctionDetails functionDetails; boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); // validate parameters @@ -217,12 +245,26 @@ public Response updateFunction(final String tenant, final String namespace, fina : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } - public Response deregisterFunction(final String tenant, final String namespace, final String functionName) { + public Response deregisterFunction(final String tenant, final String namespace, final String functionName, + String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } + try { + if (!isAuthorizedRole(tenant, clientRole)) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", tenant, namespace, + functionName, clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + // validate parameters try { validateDeregisterRequestParams(tenant, namespace, functionName); @@ -893,5 +935,18 @@ public static String createPackagePath(String tenant, String namespace, String f return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName), Utils.getUniquePackageName(Codec.encode(fileName))); } + + private boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException { + if (worker().getWorkerConfig().isAuthorizationEnabled()) { + // skip authorization if client role is super-user + if (clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) { + return true; + } + TenantInfo tenantInfo = worker().getAdmin().tenants().getTenantInfo(tenant); + return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty() + || tenantInfo.getAdminRoles().contains(clientRole)); + } + return true; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index a23c41bc1e..92957b32d4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -62,8 +62,8 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { - return functions.registerFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); + return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, clientAppId()); } @@ -78,8 +78,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { - return functions.updateFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); + return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, clientAppId()); } @@ -87,10 +87,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant, @DELETE @Path("/{tenant}/{namespace}/{functionName}") public Response deregisterFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) { - return functions.deregisterFunction( - tenant, namespace, functionName); + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); } @GET diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 866b92e8ed..c59d03dde4 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -314,7 +314,8 @@ private void testRegisterFunctionMissingArguments( inputStream, details, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); if (missingFieldName.equals("parallelism")) { @@ -342,7 +343,8 @@ private Response registerDefaultFunction() throws IOException { mockedInputStream, mockedFormData, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); } @Test @@ -587,7 +589,8 @@ private void testUpdateFunctionMissingArguments( inputStream, details, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); if (missingFieldName.equals("parallelism")) { @@ -615,7 +618,8 @@ private Response updateDefaultFunction() throws IOException { mockedInputStream, mockedFormData, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); } @Test @@ -696,7 +700,8 @@ public void testUpdateFunctionWithUrl() throws IOException { null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); } @@ -783,7 +788,8 @@ private void testDeregisterFunctionMissingArguments( Response response = resource.deregisterFunction( tenant, namespace, - function); + function, + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); @@ -793,7 +799,8 @@ private Response deregisterDefaultFunction() { return resource.deregisterFunction( tenant, namespace, - function); + function, + null); } @Test @@ -1043,7 +1050,7 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException { .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) .build(); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); } @@ -1068,7 +1075,7 @@ public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) .build(); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services