pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Improve and add authorization to function download and upload (#4644)
Date Tue, 02 Jul 2019 17:58:38 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 20cf739   Improve and add authorization to function download and upload (#4644)
20cf739 is described below

commit 20cf73932c91f773df5cd5dbeadd176216d60e4c
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Tue Jul 2 10:58:32 2019 -0700

     Improve and add authorization to function download and upload (#4644)
    
    * Improve and add authorization to function download and upload
    
    * cleaning up
    
    * fix bug
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  25 ++++-
 .../apache/pulsar/broker/admin/v2/Functions.java   |   4 +-
 .../org/apache/pulsar/client/admin/Functions.java  |  15 +++
 .../client/admin/internal/FunctionsImpl.java       |   9 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  17 ++-
 .../functions/runtime/KubernetesRuntime.java       |  23 +++-
 .../functions/worker/rest/api/ComponentImpl.java   | 122 +++++++++++++++++----
 .../functions/worker/rest/api/FunctionsImplV2.java |   6 +-
 .../worker/rest/api/v2/FunctionsApiV2Resource.java |   8 +-
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |  22 +++-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |   8 +-
 11 files changed, 211 insertions(+), 48 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index c6f38d0..fa5b366 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -632,24 +632,41 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
 
     @POST
     @ApiOperation(
-            value = "Uploads Pulsar Function file data",
+            value = "Uploads Pulsar Function file data (Admin only)",
             hidden = true
     )
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
                                final @FormDataParam("path") String path) {
-        functions.uploadFunction(uploadedInputStream, path);
+        functions.uploadFunction(uploadedInputStream, path, clientAppId());
     }
 
     @GET
     @ApiOperation(
-            value = "Downloads Pulsar Function file data",
+            value = "Downloads Pulsar Function file data (Admin only)",
             hidden = true
     )
     @Path("/download")
     public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
-        return functions.downloadFunction(path);
+        return functions.downloadFunction(path, clientAppId(), clientAuthData());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Downloads Pulsar Function file data",
+            hidden = true
+    )
+    @Path("/{tenant}/{namespace}/{functionName}/download")
+    public StreamingOutput downloadFunction(
+            @ApiParam(value = "The tenant of functions")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of functions")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of functions")
+            final @PathParam("functionName") String functionName) {
+
+        return functions.downloadFunction(tenant, namespace, functionName, clientAppId(),
clientAuthData());
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 051bcc6..4779461 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -291,14 +291,14 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
 
     @POST
     @ApiOperation(
-            value = "Uploads Pulsar Function file data",
+            value = "Uploads Pulsar Function file data (admin only)",
             hidden = true
     )
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
                                    final @FormDataParam("path") String path) {
-        return functions.uploadFunction(uploadedInputStream, path);
+        return functions.uploadFunction(uploadedInputStream, path, clientAppId());
     }
 
     @GET
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 977d195..d6a7ff3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -422,6 +422,21 @@ public interface Functions {
     void downloadFunction(String destinationFile, String path) throws PulsarAdminException;
 
     /**
+     * Download Function Code.
+     *
+     * @param destinationFile
+     *           file where data should be downloaded to
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @throws PulsarAdminException
+     */
+    void downloadFunction(String destinationFile, String tenant, String namespace, String
function) throws PulsarAdminException;
+
+    /**
      * Deprecated in favor of getting sources and sinks for their own APIs
      *
      * Fetches a list of supported Pulsar IO connectors currently running in cluster mode
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index ba7d7f2..bc2982d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -369,8 +369,16 @@ public class FunctionsImpl extends ComponentResource implements Functions
{
     }
 
     @Override
+    public void downloadFunction(String destinationPath, String tenant, String namespace,
String functionName) throws PulsarAdminException {
+        downloadFile(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download"));
+    }
+
+    @Override
     public void downloadFunction(String destinationPath, String path) throws PulsarAdminException
{
+        downloadFile(destinationPath, functions.path("download").queryParam("path", path));
+    }
 
+    private void downloadFile(String destinationPath, WebTarget target) throws PulsarAdminException
{
         HttpResponseStatus status;
         try {
             File file = new File(destinationPath);
@@ -378,7 +386,6 @@ public class FunctionsImpl extends ComponentResource implements Functions
{
                 file.createNewFile();
             }
             FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
-            WebTarget target = functions.path("download").queryParam("path", path);
 
             RequestBuilder builder = get(target.getUri().toASCIIString());
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index deec5ff..93471b2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -917,7 +917,7 @@ public class CmdFunctions extends CmdBase {
     }
 
     @Parameters(commandDescription = "Download File Data from Pulsar", hidden = true)
-    class DownloadFunction extends BaseCommand {
+    class DownloadFunction extends FunctionCommand {
         // for backward compatibility purposes
         @Parameter(
                 names = "--destinationFile",
@@ -932,7 +932,7 @@ public class CmdFunctions extends CmdBase {
         @Parameter(
                 names = "--path",
                 description = "Path where the contents are to be stored",
-                listConverter = StringConverter.class, required = true)
+                listConverter = StringConverter.class, required = false, hidden = true)
         protected String path;
 
         private void mergeArgs() {
@@ -940,13 +940,24 @@ public class CmdFunctions extends CmdBase {
         }
 
         @Override
+        void processArguments() throws Exception {
+            if (path == null) {
+                super.processArguments();
+            }
+        }
+
+        @Override
         void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
             if (StringUtils.isBlank(destinationFile)) {
                 throw new ParameterException("--destination-file needs to be specified");
             }
-            admin.functions().downloadFunction(destinationFile, path);
+            if (path != null) {
+                admin.functions().downloadFunction(destinationFile, path);
+            } else {
+                admin.functions().downloadFunction(destinationFile, tenant, namespace, functionName);
+            }
             print("Downloaded successfully");
         }
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 4c1e3d9..e2862ec 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -751,13 +751,16 @@ public class KubernetesRuntime implements Runtime {
         return Arrays.asList(
                 "sh",
                 "-c",
-                String.join(" ", getDownloadCommand(userCodePkgUrl, originalCodeFileName))
+                String.join(" ", getDownloadCommand(instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        originalCodeFileName))
                         + " && " + setShardIdEnvironmentVariableCommand()
                         + " && " + String.join(" ", processArgs)
         );
     }
 
-    private List<String> getDownloadCommand(String bkPath, String userCodeFilePath)
{
+    private List<String> getDownloadCommand(String tenant, String namespace, String
name, String userCodeFilePath) {
 
         // add auth plugin and parameters if necessary
         if (authenticationEnabled && authConfig != null) {
@@ -774,8 +777,12 @@ public class KubernetesRuntime implements Runtime {
                         pulsarAdminUrl,
                         "functions",
                         "download",
-                        "--path",
-                        bkPath,
+                        "--tenant",
+                        tenant,
+                        "--namespace",
+                        namespace,
+                        "--name",
+                        name,
                         "--destination-file",
                         userCodeFilePath);
             }
@@ -787,8 +794,12 @@ public class KubernetesRuntime implements Runtime {
                 pulsarAdminUrl,
                 "functions",
                 "download",
-                "--path",
-                bkPath,
+                "--tenant",
+                tenant,
+                "--namespace",
+                namespace,
+                "--name",
+                name,
                 "--destination-file",
                 userCodeFilePath);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 3b26c31..62a5195 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -80,7 +80,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -357,7 +356,6 @@ public abstract class ComponentImpl {
             log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName,
e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
-
         // delete state table
         if (null != worker().getStateStoreAdminClient()) {
             final String tableNs = getStateNamespace(tenant, namespace);
@@ -1175,7 +1173,16 @@ public abstract class ComponentImpl {
         }
     }
 
-    public void uploadFunction(final InputStream uploadedInputStream, final String path)
{
+    public void uploadFunction(final InputStream uploadedInputStream, final String path,
String clientRole) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole))
{
+            throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform
operation");
+        }
+
         // validate parameters
         try {
             if (uploadedInputStream == null || path == null) {
@@ -1196,27 +1203,100 @@ public abstract class ComponentImpl {
         }
     }
 
-    public StreamingOutput downloadFunction(final String path) {
-
-        final StreamingOutput streamingOutput = new StreamingOutput() {
-            @Override
-            public void write(final OutputStream output) throws IOException {
-                if (path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) {
-                    URL url = new URL(path);
-                    IOUtils.copy(url.openStream(), output);
-                } else if (path.startsWith(org.apache.pulsar.common.functions.Utils.FILE))
{
-                    URL url = new URL(path);
-                    File file;
-                    try {
-                        file = new File(url.toURI());
-                        IOUtils.copy(new FileInputStream(file), output);
-                    } catch (URISyntaxException e) {
-                        throw new IllegalArgumentException("invalid file url path: " + path);
+    public StreamingOutput downloadFunction(String tenant, String namespace, String componentName,
+                                            String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps)
{
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps))
{
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to download package
for {} ", tenant, namespace,
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to
perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName,
e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName))
{
+            log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType),
tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), componentName));
+        }
+
+        String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName)
+                .getPackageLocation().getPackagePath();
+
+        final StreamingOutput streamingOutput = output -> {
+            if (pkgPath.startsWith(Utils.HTTP)) {
+                URL url = new URL(pkgPath);
+                IOUtils.copy(url.openStream(), output);
+            } else if (pkgPath.startsWith(Utils.FILE)) {
+                URL url = new URL(pkgPath);
+                File file;
+                try {
+                    file = new File(url.toURI());
+                    IOUtils.copy(new FileInputStream(file), output);
+                } catch (URISyntaxException e) {
+                    throw new IllegalArgumentException("invalid file url path: " + pkgPath);
+                }
+            } else {
+                WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
+            }
+        };
+
+        return streamingOutput;
+    }
+
+    public StreamingOutput downloadFunction(final String path, String clientRole, AuthenticationDataHttps
clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+            // to maintain backwards compatiblity but still have authorization
+            String[] tokens = path.split("/");
+            if (tokens.length == 4) {
+                String tenant = tokens[0];
+                String namespace = tokens[1];
+                String componentName = tokens[2];
+
+                try {
+                    if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps))
{
+                        log.error("{}/{}/{} Client [{}] is not admin and authorized to download
package for {} ", tenant, namespace,
+                                componentName, clientRole, ComponentTypeUtils.toString(componentType));
+                        throw new RestException(Status.UNAUTHORIZED, "client is not authorize
to perform operation");
                     }
-                } else {
-                    WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output,
path);
+                } catch (PulsarAdminException e) {
+                    log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName,
e);
+                    throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                if (!isSuperUser(clientRole)) {
+                    throw new RestException(Status.UNAUTHORIZED, "client is not authorize
to perform operation");
                 }
             }
+        }
+
+        final StreamingOutput streamingOutput = output -> {
+            if (path.startsWith(Utils.HTTP)) {
+                URL url = new URL(path);
+                IOUtils.copy(url.openStream(), output);
+            } else if (path.startsWith(Utils.FILE)) {
+                URL url = new URL(path);
+                File file;
+                try {
+                    file = new File(url.toURI());
+                    IOUtils.copy(new FileInputStream(file), output);
+                } catch (URISyntaxException e) {
+                    throw new IllegalArgumentException("invalid file url path: " + path);
+                }
+            } else {
+                WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
+            }
         };
 
         return streamingOutput;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index d4e6414..50fe96c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -177,13 +177,13 @@ public class FunctionsImplV2 {
         return Response.ok().build();
     }
 
-    public Response uploadFunction(InputStream uploadedInputStream, String path) {
-        delegate.uploadFunction(uploadedInputStream, path);
+    public Response uploadFunction(InputStream uploadedInputStream, String path, String clientRole)
{
+        delegate.uploadFunction(uploadedInputStream, path, clientRole);
         return Response.ok().build();
     }
 
     public Response downloadFunction(String path, String clientRole) {
-        return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
+        return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path,
clientRole, null)).build();
     }
 
     public List<ConnectorDefinition> getListOfConnectors() {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
index 6486187..0165698 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
@@ -41,6 +42,7 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -283,19 +285,19 @@ public class FunctionsApiV2Resource extends FunctionApiResource {
 
     @POST
     @ApiOperation(
-            value = "Uploads Pulsar Function file data",
+            value = "Uploads Pulsar Function file data (admin only)",
             hidden = true
     )
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
                                    final @FormDataParam("path") String path) {
-        return functions.uploadFunction(uploadedInputStream, path);
+        return functions.uploadFunction(uploadedInputStream, path, clientAppId());
     }
 
     @GET
     @ApiOperation(
-            value = "Downloads Pulsar Function file data",
+            value = "Downloads Pulsar Function file data (admin only)",
             hidden = true
     )
     @Path("/download")
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 0bbacaa..203ae62 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
@@ -297,13 +298,30 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
                                final @FormDataParam("path") String path) {
-        functions.uploadFunction(uploadedInputStream, path);
+        functions.uploadFunction(uploadedInputStream, path, clientAppId());
     }
 
     @GET
     @Path("/download")
     public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
-        return functions.downloadFunction(path);
+        return functions.downloadFunction(path, clientAppId(), clientAuthData());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Downloads Pulsar Function file data",
+            hidden = true
+    )
+    @Path("/{tenant}/{namespace}/{functionName}/download")
+    public StreamingOutput downloadFunction(
+            @ApiParam(value = "The tenant of functions")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of functions")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of functions")
+            final @PathParam("functionName") String functionName) {
+
+        return functions.downloadFunction(tenant, namespace, functionName, clientAppId(),
clientAuthData());
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 635e4e6..6075d68 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
-import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
@@ -80,6 +79,7 @@ import java.util.concurrent.CompletableFuture;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -1442,11 +1442,12 @@ public class FunctionApiV3ResourceTest {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         WorkerService worker = mock(WorkerService.class);
+        doReturn(true).when(worker).isInitialized();
         WorkerConfig config = mock(WorkerConfig.class);
         when(config.isAuthorizationEnabled()).thenReturn(false);
         when(worker.getWorkerConfig()).thenReturn(config);
         FunctionsImpl function = new FunctionsImpl(()-> worker);
-        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
+        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
@@ -1461,11 +1462,12 @@ public class FunctionApiV3ResourceTest {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         WorkerService worker = mock(WorkerService.class);
+        doReturn(true).when(worker).isInitialized();
         WorkerConfig config = mock(WorkerConfig.class);
         when(config.isAuthorizationEnabled()).thenReturn(false);
         when(worker.getWorkerConfig()).thenReturn(config);
         FunctionsImpl function = new FunctionsImpl(() -> worker);
-        StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
+        StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation,
null, null);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);


Mime
View raw message