pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #2438: Fix: Function assignment can support large number of topics
Date Tue, 28 Aug 2018 06:56:52 GMT
merlimat closed pull request #2438: Fix: Function assignment can support large number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb9f6..497c414bc3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,7 @@ connectorsDirectory: ./connectors
 functionMetadataTopicName: metadata
 clusterCoordinationTopicName: coordinate
 pulsarFunctionsNamespace: public/functions
+pulsarAssignmentNamespace: public/assignment
 pulsarFunctionsCluster: standalone
 pulsarServiceUrl: pulsar://localhost:6650
 pulsarWebServiceUrl: http://localhost:8080
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index dd39222248..54c14c05b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -105,6 +105,7 @@
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String pulsarAssignmentNamespace = tenant + "/use/pulsar-assignment";
     String primaryHost;
     String workerId;
 
@@ -212,6 +213,7 @@ void shutdown() throws Exception {
     private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
         workerConfig = new WorkerConfig();
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setPulsarAssignmentNamespace(pulsarAssignmentNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 366eaba6a0..060120b10f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -20,11 +20,15 @@
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Request;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
+import java.util.List;
 import java.util.function.Function;
 
 @Slf4j
@@ -33,13 +37,22 @@
 
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
+        
+        private long currentVersion = 0;
+        private final List<Request.AssignmentsUpdate> currentVersionAssignments;
+        private volatile MessageId previousOldAssignmentMsgId = null;
 
     public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager,
                 Reader<byte[]> reader)
             throws PulsarClientException {
-            this.functionRuntimeManager = functionRuntimeManager;
-            this.reader = reader;
+        this.functionRuntimeManager = functionRuntimeManager;
+        this.reader = reader;
+        this.currentVersionAssignments = Lists.newArrayList();
+        // complete init if reader has no message to read so, scheduled-manager can schedule
assignments
+        if (!hasMessageAvailable()) {
+            this.functionRuntimeManager.initialized = true;
         }
+    }
 
     public void start() {
 
@@ -66,29 +79,40 @@ public void close() {
     @Override
     public void accept(Message<byte[]> msg) {
 
-        // check if latest
-        boolean hasMessageAvailable;
+        Request.AssignmentsUpdate assignmentsUpdate;
         try {
-            hasMessageAvailable = this.reader.hasMessageAvailable();
-        } catch (PulsarClientException e) {
+            assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData());
+        } catch (IOException e) {
+            log.error("[{}] Received bad assignment update at message {}", reader.getTopic(),
msg.getMessageId(),
+                    e);
+            // TODO: find a better way to handle bad request
             throw new RuntimeException(e);
         }
-        if (!hasMessageAvailable) {
-            Request.AssignmentsUpdate assignmentsUpdate;
-            try {
-                assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData());
-            } catch (IOException e) {
-                log.error("[{}] Received bad assignment update at message {}", reader.getTopic(),
msg.getMessageId(),
-                        e);
-                // TODO: find a better way to handle bad request
-                throw new RuntimeException(e);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Received assignment update: {}", assignmentsUpdate);
+        if (log.isDebugEnabled()) {
+            log.debug("Received assignment update: {}", assignmentsUpdate);
+        }
+
+        // clear previous version assignments and ack all previous messages
+        if (currentVersion < assignmentsUpdate.getVersion()) {
+            currentVersionAssignments.clear();
+            // ack the outdated version to avoid processing again
+            if (previousOldAssignmentMsgId != null && this.functionRuntimeManager.isActiveRuntimeConsumer.get())
{
+                this.functionRuntimeManager.assignmentConsumer.acknowledgeCumulativeAsync(previousOldAssignmentMsgId);
             }
+        }
 
-            this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), assignmentsUpdate);
+        currentVersionAssignments.add(assignmentsUpdate);
+        
+        // process only if the latest message
+        if (!hasMessageAvailable()) {
+            this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), currentVersionAssignments);
+            // function-runtime manager has processed all assignments in the topic at least
once.. so scheduled-manager
+            // can only publish any new assignment with latest processed version
+            this.functionRuntimeManager.initialized = true;
         }
+
+        currentVersion = assignmentsUpdate.getVersion();
+        previousOldAssignmentMsgId = msg.getMessageId();
         // receive next request
         receiveOne();
     }
@@ -99,4 +123,12 @@ public Void apply(Throwable cause) {
         // TODO: find a better way to handle consumer functions
         throw new RuntimeException(cause);
     }
+    
+    private boolean hasMessageAvailable() {
+        try {
+            return this.reader.hasMessageAvailable();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 93828de40d..452b51c52b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,10 +26,16 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -38,6 +44,7 @@
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
 import javax.ws.rs.WebApplicationException;
@@ -58,6 +65,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -94,6 +102,11 @@
     private final ConnectorsManager connectorsManager;
     
     private final PulsarAdmin functionAdmin;
+    
+    private static final String ASSIGNMENT_TOPIC_SUBSCRIPTION = "pulsar.functions";
+    protected AtomicBoolean isActiveRuntimeConsumer = new AtomicBoolean(false);
+    protected Consumer<byte[]> assignmentConsumer;
+    protected volatile boolean initialized = false;
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService,
Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager connectorsManager) throws
Exception {
@@ -103,7 +116,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerSer
 
         Reader<byte[]> reader = workerService.getClient().newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
-                .startMessageId(MessageId.earliest)
+                .startMessageId(getLatestAssignmentMsgId(workerConfig, workerService))
                 .create();
 
         this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader);
@@ -132,6 +145,20 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerSer
         } else {
             throw new RuntimeException("Either Thread or Process Container Factory need to
be set");
         }
+        
+        this.assignmentConsumer = workerService.getClient().newConsumer()
+                .topic(this.workerConfig.getFunctionAssignmentTopic()).subscriptionName(ASSIGNMENT_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover).consumerEventListener(new ConsumerEventListener()
{
+                    private static final long serialVersionUID = 1L;
+
+                    public void becameActive(Consumer<?> consumer, int partitionId)
{
+                        isActiveRuntimeConsumer.set(true);
+                    }
+
+                    public void becameInactive(Consumer<?> consumer, int partitionId)
{
+                        isActiveRuntimeConsumer.set(false);
+                    }
+                }).subscribe();
 
         this.actionQueue = new LinkedBlockingQueue<>();
 
@@ -141,6 +168,29 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerSer
         this.membershipManager = membershipManager;
     }
 
+    private MessageId getLatestAssignmentMsgId(WorkerConfig workerConfig, WorkerService workerService)
{
+        try {
+            PulsarAdmin admin = workerService.getBrokerAdmin();
+            PersistentTopicInternalStats topicStats = admin.topics()
+                    .getInternalStats(workerConfig.getFunctionAssignmentTopic());
+            if (topicStats != null && topicStats.cursors != null) {
+                CursorStats cursor = topicStats.cursors.get(ASSIGNMENT_TOPIC_SUBSCRIPTION);
+                if (cursor != null && StringUtils.isNotBlank(cursor.markDeletePosition))
{
+                    String[] ids = cursor.markDeletePosition.split(":");
+                    if (ids.length == 2) {
+                        MessageIdImpl msgId = new MessageIdImpl(Long.parseLong(ids[0]), Long.parseLong(ids[1]),
-1);
+                        log.info("Assignment-reader starts reading from {}", msgId);
+                        return msgId;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to get assignment-msg id for runtime-manager for {}-{}",
+                    workerConfig.getFunctionAssignmentTopic(), ASSIGNMENT_TOPIC_SUBSCRIPTION,
e);
+        }
+        return MessageId.earliest;
+    }
+
     /**
      * Starts the function runtime manager
      */
@@ -450,6 +500,12 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart)
thro
         return functionStatusListBuilder.build();
     }
 
+    public synchronized void processAssignmentUpdate(MessageId messageId, List<AssignmentsUpdate>
assignmentsUpdates) {
+        assignmentsUpdates.forEach(assignmentsUpdate -> {
+            processAssignmentUpdate(messageId, assignmentsUpdate);
+        });
+    }
+    
     /**
      * Process an assignment update from the assignment topic
      * @param messageId the message id of the update assignment
@@ -548,9 +604,10 @@ public synchronized void processAssignmentUpdate(MessageId messageId,
Assignment
 
             // set as current assignment
             this.currentAssignmentVersion = assignmentsUpdate.getVersion();
-
         } else {
-            log.debug("Received out of date assignment update: {}", assignmentsUpdate);
+            if (log.isDebugEnabled()) {
+                log.debug("Received out of date assignment update: {}", assignmentsUpdate);
+            }
         }
     }
 
@@ -689,4 +746,8 @@ public void close() throws Exception {
     private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
         return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
     }
+    
+    public boolean isInitialized() {
+        return initialized;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index b18fd12881..509c9251a5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -21,6 +21,7 @@
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -43,6 +44,7 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index ed00958d7e..40ab1324d0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -47,9 +47,13 @@
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
+import com.google.common.collect.Iterables;
+
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
+    private static final int MAX_ASSIGNMENTS_IN_MSG = 2000;
+
     private final WorkerConfig workerConfig;
 
     @Setter
@@ -99,6 +103,14 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient)
{
     }
 
     private void invokeScheduler() {
+        
+        // publish new assignment only once function-runtime manager updates previously updated
version so,
+        // scheduled-manager can publish new assignment with updated version at FunctionRuntimeManager
+        if (!functionRuntimeManager.isInitialized()) {
+            schedule();
+            return;
+        }
+        
         List<String> currentMembership = this.membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());
 
@@ -143,21 +155,12 @@ private void invokeScheduler() {
         List<Assignment> assignments = this.scheduler.schedule(
                 needsAssignment, currentAssignments, currentMembership);
 
-        log.debug("New assignments computed: {}", assignments);
+        if (log.isDebugEnabled()) {
+            log.debug("New assignments computed: {}", assignments);
+        }
 
         long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion()
+ 1;
-        Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder()
-                .setVersion(assignmentVersion)
-                .addAllAssignments(assignments)
-                .build();
-
-        CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(assignmentsUpdate.toByteArray());
-        try {
-            messageIdCompletableFuture.get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to send assignment update", e);
-            throw new RuntimeException(e);
-        }
+        publishAssignmentUpdate(assignmentVersion, assignments);
 
         // wait for assignment update to go throw the pipeline
         int retries = 0;
@@ -176,6 +179,23 @@ private void invokeScheduler() {
         }
     }
 
+    private void publishAssignmentUpdate(long assignmentVersion, List<Assignment> assignments)
{
+
+        Iterable<List<Assignment>> batches = Iterables.partition(assignments,
MAX_ASSIGNMENTS_IN_MSG);
+        batches.forEach(assignmentBatch -> {
+            Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder()
+                    .setVersion(assignmentVersion).addAllAssignments(assignmentBatch).build();
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer
+                    .sendAsync(assignmentsUpdate.toByteArray());
+            try {
+                messageIdCompletableFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Failed to send assignment update", e);
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
     public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData>
allFunctions) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index cb73eaa16f..4d6ba5f7dc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -97,32 +97,10 @@ private static URI initialize(WorkerConfig workerConfig)
         // getting namespace policy
         log.info("Initializing Pulsar Functions namespace...");
         try {
-            try {
-                admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
-            } catch (PulsarAdminException e) {
-                if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
-                    // if not found than create
-                    try {
-                        Policies policies = new Policies();
-                        policies.retention_policies = new RetentionPolicies(-1, -1);
-                        policies.replication_clusters = new HashSet<>();
-                        policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
-                        admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(),
-                                policies);
-                    } catch (PulsarAdminException e1) {
-                        // prevent race condition with other workers starting up
-                        if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode())
{
-                            log.error("Failed to create namespace {} for pulsar functions",
workerConfig
-                                    .getPulsarFunctionsNamespace(), e1);
-                            throw e1;
-                        }
-                    }
-                } else {
-                    log.error("Failed to get retention policy for pulsar function namespace
{}",
-                            workerConfig.getPulsarFunctionsNamespace(), e);
-                    throw e;
-                }
-            }
+            createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), workerConfig.getPulsarFunctionsNamespace(),
+                    true);
+            createNamespace(admin, workerConfig.getPulsarFunctionsCluster(),
+                    workerConfig.getPulsarAssignmentNamespace(), false);
             try {
                 internalConf = admin.brokers().getInternalConfigurationData();
             } catch (PulsarAdminException e) {
@@ -146,6 +124,35 @@ private static URI initialize(WorkerConfig workerConfig)
         }
     }
 
+    private static void createNamespace(PulsarAdmin admin, String cluster, String namespace,
boolean infiniteRetention)
+            throws PulsarAdminException {
+        try {
+            admin.namespaces().getPolicies(namespace);
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
+                // if not found than create
+                try {
+                    Policies policies = new Policies();
+                    if (infiniteRetention) {
+                        policies.retention_policies = new RetentionPolicies(-1, -1);
+                    }
+                    policies.replication_clusters = new HashSet<>();
+                    policies.replication_clusters.add(cluster);
+                    admin.namespaces().createNamespace(namespace, policies);
+                } catch (PulsarAdminException e1) {
+                    // prevent race condition with other workers starting up
+                    if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) {
+                        log.error("Failed to create namespace {} for pulsar functions", namespace,
e1);
+                        throw e1;
+                    }
+                }
+            } else {
+                log.error("Failed to get retention policy for pulsar function namespace {}",
namespace, e);
+                throw e;
+            }
+        }
+    }
+
     @Override
     protected void doStop() {
         if (null != this.server) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0f695a974f..74ccd7c7dd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -61,6 +61,7 @@
     private String pulsarWebServiceUrl;
     private String clusterCoordinationTopicName;
     private String pulsarFunctionsNamespace;
+    private String pulsarAssignmentNamespace;
     private String pulsarFunctionsCluster;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
@@ -133,7 +134,7 @@ public String getClusterCoordinationTopic() {
     }
 
     public String getFunctionAssignmentTopic() {
-        return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionAssignmentTopicName);
+        return String.format("persistent://%s/%s", pulsarAssignmentNamespace, functionAssignmentTopicName);
     }
 
     public static WorkerConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 85a2122dd0..c19d208511 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -20,10 +20,16 @@
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Request;
@@ -45,6 +51,7 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class FunctionRuntimeManagerTest {
 
@@ -81,7 +88,7 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception {
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -178,7 +185,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception
{
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -279,7 +286,7 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception
{
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -394,4 +401,24 @@ public boolean matches(Object o) {
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3);
     }
+    
+    private static PulsarClient mockPulsarClient() throws PulsarClientException {
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);
+
+        when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+        when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+        
+        return mockClient;
+    }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2753bf196a..26e0540e4c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -76,7 +76,6 @@ public void testConsumerEventListener() throws Exception {
         when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
         when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
         when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
-
         when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
         WorkerService workerService = mock(WorkerService.class);
         doReturn(workerConfig).when(workerService).getWorkerConfig();
@@ -120,7 +119,7 @@ private static PulsarClient mockPulsarClient() throws PulsarClientException
{
         when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
 
         when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
-
+        
         return mockClient;
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 19977bd812..60341240d1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -111,6 +111,7 @@ public void setup() throws PulsarClientException {
 
         schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
+        when(functionRuntimeManager.isInitialized()).thenReturn(true);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);
         schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
@@ -145,6 +146,7 @@ public void testSchedule() throws Exception {
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -186,6 +188,7 @@ public void testNothingNewToSchedule() throws Exception {
         assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -242,6 +245,7 @@ public void testAddingFunctions() throws Exception {
         assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -313,6 +317,7 @@ public void testDeletingFunctions() throws Exception {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -372,6 +377,7 @@ public void testScalingUp() throws Exception {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -478,6 +484,7 @@ public void testScalingDown() throws Exception {
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
@@ -646,6 +653,7 @@ public void testUpdate() throws Exception {
 
         //set version
         doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
+        doReturn(true).when(functionRuntimeManager).isInitialized();
 
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message