pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rdhaba...@apache.org
Subject [incubator-pulsar] branch master updated: Fix: authorization while redirecting function admin call (#2416)
Date Wed, 22 Aug 2018 17:53:33 GMT
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cc60027  Fix: authorization while redirecting function admin call (#2416)
cc60027 is described below

commit cc60027c8dd74469d6e0438ef8df8ea06ada6f2d
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Wed Aug 22 10:53:30 2018 -0700

    Fix: authorization while redirecting function admin call (#2416)
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  4 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  2 +-
 .../functions/worker/FunctionRuntimeManager.java   | 45 ++++++++--------------
 .../pulsar/functions/worker/WorkerConfig.java      |  7 +++-
 .../pulsar/functions/worker/WorkerService.java     | 24 +++++++++---
 .../functions/worker/rest/FunctionApiResource.java |  3 ++
 .../functions/worker/rest/api/FunctionsImpl.java   | 17 ++++----
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  5 ++-
 .../worker/FunctionRuntimeManagerTest.java         | 18 +++++++--
 .../functions/worker/MembershipManagerTest.java    | 19 +++++++--
 10 files changed, 90 insertions(+), 54 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 4384f50..315027f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -277,7 +277,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName")
String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId);
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId,
uri.getRequestUri());
     }
 
     @POST
@@ -302,7 +302,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName")
String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId);
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId,
uri.getRequestUri());
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2254626..97de0b8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -116,7 +116,7 @@ public class PulsarFunctionTlsTest {
         PulsarAdmin admin = mock(PulsarAdmin.class);
         Tenants tenants = mock(Tenants.class);
         when(admin.tenants()).thenReturn(tenants);
-        when(functionsWorkerService.getAdmin()).thenReturn(admin);
+        when(functionsWorkerService.getBrokerAdmin()).thenReturn(admin);
         Set<String> admins = Sets.newHashSet("superUser");
         TenantInfo tenantInfo = new TenantInfo(admins, null);
         when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1016171..93828de 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
@@ -45,6 +46,7 @@ import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.Response.Status;
 
 import java.util.Collection;
@@ -91,15 +93,15 @@ public class FunctionRuntimeManager implements AutoCloseable{
     private MembershipManager membershipManager;
     private final ConnectorsManager connectorsManager;
     
-    public FunctionRuntimeManager(WorkerConfig workerConfig,
-                                  PulsarClient pulsarClient,
-                                  Namespace dlogNamespace,
-                                  MembershipManager membershipManager,
-                                  ConnectorsManager connectorsManager) throws Exception {
+    private final PulsarAdmin functionAdmin;
+
+    public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService,
Namespace dlogNamespace,
+            MembershipManager membershipManager, ConnectorsManager connectorsManager) throws
Exception {
         this.workerConfig = workerConfig;
         this.connectorsManager = connectorsManager;
+        this.functionAdmin = workerService.getFunctionAdmin();
 
-        Reader<byte[]> reader = pulsarClient.newReader()
+        Reader<byte[]> reader = workerService.getClient().newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
                 .startMessageId(MessageId.earliest)
                 .create();
@@ -327,7 +329,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
     }
 
     public Response stopFunctionInstance(String tenant, String namespace, String functionName,
int instanceId,
-            boolean restart) throws Exception {
+            boolean restart, URI uri) throws Exception {
         Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
         final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName,
instanceId);
         if (assignment == null) {
@@ -355,19 +357,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
                         .entity(new ErrorData(fullFunctionName + " has not been assigned
yet")).build();
             }
 
-            URI redirect = null;
-            String action = restart ? "restart" : "stop";
-            final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
-                    workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace,
functionName, instanceId,
-                    action);
-            try {
-                redirect = new URI(redirectUrl);
-            } catch (URISyntaxException e) {
-                log.error("Error in preparing redirect url for {}/{}/{}/{}: {}", tenant,
namespace, functionName,
-                        instanceId, e.getMessage(), e);
-                return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(fullFunctionName + " invalid redirection url")).build();
-            }
+            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
             throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
         }
     }
@@ -401,14 +391,13 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     }
                     continue;
                 }
-                Client client = ClientBuilder.newClient();
-                String action = restart ? "restart" : "stop";
-                // TODO: create and use pulsar-admin to support authorization and authentication
and manage redirect
-                final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
-                        workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace,
functionName,
-                        assignment.getInstance().getInstanceId(), action);
-                client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
-                        .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+                if (restart) {
+                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                } else {
+                    this.functionAdmin.functions().stopFunction(tenant, namespace, functionName,
+                            assignment.getInstance().getInstanceId());
+                }
             }
         }
         return Response.status(Status.OK).build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 35d3e18..38ef5d3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -56,6 +56,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     private int workerPortTls;
     private String connectorsDirectory = "./connectors";
     private String functionMetadataTopicName;
+    private String functionWebServiceUrl;
     private String pulsarServiceUrl;
     private String pulsarWebServiceUrl;
     private String clusterCoordinationTopicName;
@@ -153,7 +154,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration
{
         }
         return this.workerHostname;
     }
-    
+
+    public String getWorkerWebAddress() {
+        return String.format("http://%s:%d", this.getWorkerHostname(), this.getWorkerPort());
+    }
+
     public static String unsafeLocalhostResolve() {
         try {
             return InetAddress.getLocalHost().getHostName();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index eb543c4..0850766 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -32,6 +32,8 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
@@ -63,7 +65,8 @@ public class WorkerService {
     private final ScheduledExecutorService statsUpdater;
     private AuthenticationService authenticationService;
     private ConnectorsManager connectorsManager;
-    private PulsarAdmin admin;
+    private PulsarAdmin brokerAdmin;
+    private PulsarAdmin functionAdmin;
     private final MetricsGenerator metricsGenerator;
 
     public WorkerService(WorkerConfig workerConfig) {
@@ -76,7 +79,14 @@ public class WorkerService {
     public void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
 
-        this.admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+        this.brokerAdmin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+                workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
+                workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
+        
+        final String functionWebServiceUrl = StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
+                ? workerConfig.getFunctionWebServiceUrl()
+                : workerConfig.getWorkerWebAddress(); 
+        this.functionAdmin = Utils.getPulsarAdminClient(functionWebServiceUrl,
                 workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
                 workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
 
@@ -131,7 +141,7 @@ public class WorkerService {
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                    this.workerConfig, this.client, this.dlogNamespace, this.membershipManager,
connectorsManager);
+                    this.workerConfig, this, this.dlogNamespace, this.membershipManager,
connectorsManager);
 
             // Setting references to managers in scheduler
             this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -215,8 +225,12 @@ public class WorkerService {
             schedulerManager.close();
         }
 
-        if (null != this.admin) {
-            this.admin.close();
+        if (null != this.brokerAdmin) {
+            this.brokerAdmin.close();
+        }
+        
+        if (null != this.functionAdmin) {
+            this.functionAdmin.close();
         }
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 4673d56..be97986 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
 
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -38,6 +39,8 @@ public class FunctionApiResource implements Supplier<WorkerService>
{
     protected ServletContext servletContext;
     @Context
     protected HttpServletRequest httpRequest;
+    @Context
+    protected UriInfo uri;
 
     public FunctionApiResource() {
         this.functions = new FunctionsImpl(this);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c2747f4..d5f3f57 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.MalformedURLException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -386,17 +387,17 @@ public class FunctionsImpl {
     }
 
     public Response stopFunctionInstance(final String tenant, final String namespace, final
String functionName,
-            final String instanceId) {
-        return stopFunctionInstance(tenant, namespace, functionName, instanceId, false);
+            final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, functionName, instanceId, false, uri);
     }
 
     public Response restartFunctionInstance(final String tenant, final String namespace,
final String functionName,
-            final String instanceId) {
-        return stopFunctionInstance(tenant, namespace, functionName, instanceId, true);
+            final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, functionName, instanceId, true, uri);
     }
 
     public Response stopFunctionInstance(final String tenant, final String namespace, final
String functionName,
-            final String instanceId, boolean restart) {
+            final String instanceId, boolean restart, URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -413,7 +414,7 @@ public class FunctionsImpl {
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant,
namespace, functionName);
+            log.error("Function does not exist @ /{}/{}/{}", tenant, namespace, functionName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
         }
@@ -421,7 +422,7 @@ public class FunctionsImpl {
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
             return functionRuntimeManager.stopFunctionInstance(tenant, namespace, functionName,
-                    Integer.parseInt(instanceId), restart);
+                    Integer.parseInt(instanceId), restart, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
@@ -1057,7 +1058,7 @@ public class FunctionsImpl {
             if (isSuperUser(clientRole)) {
                 return true;
             }
-            TenantInfo tenantInfo = worker().getAdmin().tenants().getTenantInfo(tenant);
+            TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
             return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()
                     || tenantInfo.getAdminRoles().contains(clientRole));
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ddea22a..e13f69c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -25,6 +25,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.util.List;
 
 import javax.ws.rs.Consumes;
@@ -171,7 +172,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName")
String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId);
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId,
this.uri.getRequestUri());
     }
 
     @POST
@@ -196,7 +197,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName")
String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId);
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId,
this.uri.getRequestUri());
     }
 
     @POST
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4f618c4..85a2122 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
@@ -86,11 +87,14 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
-
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        
         // test new assignment add functions
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
-                pulsarClient,
+                workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class)
@@ -180,11 +184,14 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
 
         // test new assignment delete functions
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
-                pulsarClient,
+                workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class)
@@ -278,11 +285,14 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
 
         // test new assignment update functions
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
-                pulsarClient,
+                workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class)
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index e0a2428..7ed6aca 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -129,9 +130,13 @@ public class MembershipManagerTest {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
-                pulsarClient,
+                workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class)
@@ -194,9 +199,13 @@ public class MembershipManagerTest {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
-                pulsarClient,
+                workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class)
@@ -284,9 +293,13 @@ public class MembershipManagerTest {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
-                pulsarClient,
+                workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class)


Mime
View raw message