From commits-return-11642-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Tue Jul 24 05:18:32 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3015E180647 for ; Tue, 24 Jul 2018 05:18:31 +0200 (CEST) Received: (qmail 57646 invoked by uid 500); 24 Jul 2018 03:18:30 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 57637 invoked by uid 99); 24 Jul 2018 03:18:30 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2018 03:18:30 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A099F81E61; Tue, 24 Jul 2018 03:18:29 +0000 (UTC) Date: Tue, 24 Jul 2018 03:18:29 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch master updated: Add authorization support on function apis (#2213) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153240230951.942.9427420712143177571@gitbox.apache.org> From: rdhabalia@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 2543ccfe6d5978a32a1cab7e346569af8d504358 X-Git-Newrev: ce6fe8b9b757c505afafd8f209f673ec86733aa7 X-Git-Rev: ce6fe8b9b757c505afafd8f209f673ec86733aa7 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ce6fe8b Add authorization support on function apis (#2213) ce6fe8b is described below commit ce6fe8b9b757c505afafd8f209f673ec86733aa7 Author: Rajan Dhabalia AuthorDate: Mon Jul 23 20:18:25 2018 -0700 Add authorization support on function apis (#2213) * Add authorization support on function apis * fix authorization enable check --- conf/functions_worker.yml | 8 +++ .../pulsar/broker/admin/impl/FunctionsBase.java | 11 ++-- .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 41 ++++++++++++++- pulsar-functions/worker/pom.xml | 6 +++ .../pulsar/functions/worker/WorkerConfig.java | 22 +++++++- .../pulsar/functions/worker/WorkerService.java | 17 +++++- .../functions/worker/rest/FunctionApiResource.java | 12 +++++ .../pulsar/functions/worker/rest/WorkerServer.java | 15 ++++++ .../functions/worker/rest/api/FunctionsImpl.java | 61 ++++++++++++++++++++-- .../worker/rest/api/v2/FunctionApiV2Resource.java | 14 +++-- .../rest/api/v2/FunctionApiV2ResourceTest.java | 25 +++++---- 11 files changed, 203 insertions(+), 29 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 58bcf1d..4194337 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 metricsSamplingPeriodSec: 60 +# Enforce authentication +authenticationEnabled: false +# Enforce authorization on accessing functions api +authorizationEnabled: false +# Set of autentication provider name list, which is a list of class names +authenticationProviders: +# Set of role names that are treated as "super-user", meaning they will be able to access any admin-api +superUserRoles: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 6338ce9..f97f180 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -82,8 +82,8 @@ public class FunctionsBase extends AdminResource implements Supplier clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + String roleName = validRoleName ? "superUser" : "invalid"; + TenantInfo propAdmin = new TenantInfo(); + propAdmin.getAdminRoles().add(roleName); + propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); + admin.tenants().updateTenant(tenant, propAdmin); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, + sinkTopic, subscriptionName); + try { + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + Assert.assertTrue(validRoleName); + } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { + Assert.assertFalse(validRoleName); + } + } } \ No newline at end of file diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 9b10cbe..029f573 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -36,6 +36,12 @@ ${project.groupId} + pulsar-broker-common + ${project.version} + + + + ${project.groupId} pulsar-functions-runtime ${project.version} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 09f54ef..eda9b15 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -20,14 +20,18 @@ package org.apache.pulsar.functions.worker; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Properties; +import java.util.Set; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.configuration.PulsarConfiguration; import lombok.Data; import lombok.EqualsAndHashCode; @@ -42,7 +46,7 @@ import lombok.experimental.Accessors; @EqualsAndHashCode @ToString @Accessors(chain = true) -public class WorkerConfig implements Serializable { +public class WorkerConfig implements Serializable, PulsarConfiguration { private static final long serialVersionUID = 1L; @@ -74,6 +78,17 @@ public class WorkerConfig implements Serializable { private boolean tlsAllowInsecureConnection = false; private boolean tlsHostnameVerificationEnable = false; private int metricsSamplingPeriodSec = 60; + // Enforce authentication + private boolean authenticationEnabled = false; + // Autentication provider name list, which is a list of class names + private Set authenticationProviders = Sets.newTreeSet(); + // Enforce authorization on accessing functions admin-api + private boolean authorizationEnabled = false; + // Role names that are treated as "super-user", meaning they will be able to access any admin-api + private Set superUserRoles = Sets.newTreeSet(); + + private Properties properties = new Properties(); + @Data @Setter @@ -135,4 +150,9 @@ public class WorkerConfig implements Serializable { throw new IllegalStateException("Failed to resolve localhost name.", ex); } } + + @Override + public void setProperties(Properties properties) { + this.properties = properties; + } } \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 29dff53..6af9c8f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -35,9 +35,12 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; /** * A service component contains everything to run a worker except rest server. @@ -57,8 +60,9 @@ public class WorkerService { private SchedulerManager schedulerManager; private boolean isInitialized = false; private final ScheduledExecutorService statsUpdater; - + private AuthenticationService authenticationService; private ConnectorsManager connectorsManager; + private PulsarAdmin admin; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; @@ -68,6 +72,11 @@ public class WorkerService { public void start(URI dlogUri) throws InterruptedException { log.info("Starting worker {}...", workerConfig.getWorkerId()); + + this.admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), + workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), + workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection()); + try { log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() .writeValueAsString(workerConfig)); @@ -128,6 +137,8 @@ public class WorkerService { // initialize function metadata manager this.functionMetaDataManager.initialize(); + + authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)); // Starting cluster services log.info("Start cluster services..."); @@ -200,6 +211,10 @@ public class WorkerService { if (null != schedulerManager) { schedulerManager.close(); } + + if (null != this.admin) { + this.admin.close(); + } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java index 1c5c739..4673d56 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java @@ -18,9 +18,13 @@ */ package org.apache.pulsar.functions.worker.rest; +import java.util.Optional; import java.util.function.Supplier; import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Context; + +import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; @@ -32,6 +36,8 @@ public class FunctionApiResource implements Supplier { private WorkerService workerService; @Context protected ServletContext servletContext; + @Context + protected HttpServletRequest httpRequest; public FunctionApiResource() { this.functions = new FunctionsImpl(this); @@ -44,4 +50,10 @@ public class FunctionApiResource implements Supplier { } return this.workerService; } + + public String clientAppId() { + return httpRequest != null + ? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName) + : null; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 524a6ad..a57a952 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -19,8 +19,15 @@ package org.apache.pulsar.functions.worker.rest; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; +import java.util.Map; + +import javax.servlet.DispatcherType; + import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.eclipse.jetty.server.Handler; @@ -28,9 +35,11 @@ import org.eclipse.jetty.server.Server; import java.net.BindException; import java.net.URI; + import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.glassfish.jersey.server.ResourceConfig; @@ -41,6 +50,7 @@ public class WorkerServer implements Runnable { private final WorkerConfig workerConfig; private final WorkerService workerService; + private static final String MATCH_ALL = "/*"; private static String getErrorMessage(Server server, int port, Exception ex) { if (ex instanceof BindException) { @@ -106,7 +116,12 @@ public class WorkerServer implements Runnable { final ServletHolder apiServlet = new ServletHolder(new ServletContainer(config)); contextHandler.addServlet(apiServlet, "/*"); + if (workerService.getWorkerConfig().isAuthenticationEnabled()) { + FilterHolder filter = new FilterHolder(new AuthenticationFilter(workerService.getAuthenticationService())); + contextHandler.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + } return contextHandler; } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 2ac48dc..b935bf5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -57,12 +57,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -117,11 +119,24 @@ public class FunctionsImpl { public Response registerFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson) { + final String functionPkgUrl, final String functionDetailsJson, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } + + try { + if (!isAuthorizedRole(tenant, clientRole)) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace, functionName, + clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } FunctionDetails functionDetails; boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); @@ -168,12 +183,25 @@ public class FunctionsImpl { public Response updateFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson) { + final String functionPkgUrl, final String functionDetailsJson, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } + try { + if (!isAuthorizedRole(tenant, clientRole)) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", tenant, namespace, + functionName, clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + FunctionDetails functionDetails; boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); // validate parameters @@ -217,12 +245,26 @@ public class FunctionsImpl { : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } - public Response deregisterFunction(final String tenant, final String namespace, final String functionName) { + public Response deregisterFunction(final String tenant, final String namespace, final String functionName, + String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } + try { + if (!isAuthorizedRole(tenant, clientRole)) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", tenant, namespace, + functionName, clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + // validate parameters try { validateDeregisterRequestParams(tenant, namespace, functionName); @@ -893,5 +935,18 @@ public class FunctionsImpl { return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName), Utils.getUniquePackageName(Codec.encode(fileName))); } + + private boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException { + if (worker().getWorkerConfig().isAuthorizationEnabled()) { + // skip authorization if client role is super-user + if (clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) { + return true; + } + TenantInfo tenantInfo = worker().getAdmin().tenants().getTenantInfo(tenant); + return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty() + || tenantInfo.getAdminRoles().contains(clientRole)); + } + return true; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index a23c41b..92957b3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -62,8 +62,8 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { - return functions.registerFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); + return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, clientAppId()); } @@ -78,8 +78,8 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { - return functions.updateFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); + return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, clientAppId()); } @@ -87,10 +87,8 @@ public class FunctionApiV2Resource extends FunctionApiResource { @DELETE @Path("/{tenant}/{namespace}/{functionName}") public Response deregisterFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) { - return functions.deregisterFunction( - tenant, namespace, functionName); + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); } @GET diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 866b92e..c59d03d 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -314,7 +314,8 @@ public class FunctionApiV2ResourceTest { inputStream, details, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); if (missingFieldName.equals("parallelism")) { @@ -342,7 +343,8 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); } @Test @@ -587,7 +589,8 @@ public class FunctionApiV2ResourceTest { inputStream, details, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); if (missingFieldName.equals("parallelism")) { @@ -615,7 +618,8 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, null, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); } @Test @@ -696,7 +700,8 @@ public class FunctionApiV2ResourceTest { null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); } @@ -783,7 +788,8 @@ public class FunctionApiV2ResourceTest { Response response = resource.deregisterFunction( tenant, namespace, - function); + function, + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); @@ -793,7 +799,8 @@ public class FunctionApiV2ResourceTest { return resource.deregisterFunction( tenant, namespace, - function); + function, + null); } @Test @@ -1043,7 +1050,7 @@ public class FunctionApiV2ResourceTest { .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) .build(); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); } @@ -1068,7 +1075,7 @@ public class FunctionApiV2ResourceTest { .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) .build(); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); }