pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] jerrypeng commented on a change in pull request #7255: Re-work Function MetaDataManager to make all metadata writes only by the leader
Date Mon, 15 Jun 2020 19:22:52 GMT

jerrypeng commented on a change in pull request #7255:
URL: https://github.com/apache/pulsar/pull/7255#discussion_r440394613



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace,
St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData
functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean
delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE
: Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata,
functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
-    /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs
to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String
tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData,
functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(deregisterRequest);
+    // Note that this method cannot be syncrhonized because the tailer might still be processing
messages
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            tailer.stopWhenNoMoreMessages();
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
     }
 
-    /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs
to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String
tenant, String namespace, String functionName,
-                                                                                      Integer
instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData,
instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
+    }
 
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
+        exclusiveLeaderProducer = null;
+        initializeTailer();

Review comment:
       I is not a good idea to simply re-create the metadata tailer object.  This will cause
the tailer to start reading from the beginning.  This will cause all the tailer to have to
re-read the whole topic.  The problem here is not only that the the tailer has to re-read
the whole topic which can take time but during that time the in-memory metadata map for the
worker will be inconsistent.  If a user request is sent to the worker during this time, the
behavior might be incorrect.  I would suggest keeping track of the message Id that represents
to which message the current view of the metadata relates to.  When the worker is the leader
and it updates its in memory metadata cache and  produces messages to metadata topic, we should
update the message id.  When the worker loses leadership, the tailer should use the message
id and start reading from there

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -168,68 +151,78 @@ public synchronized boolean containsFunction(String tenant, String namespace,
St
         return containsFunctionMetaData(tenant, namespace, functionName);
     }
 
-    /**
-     * Sends an update request to the FMT (Function Metadata Topic)
-     * @param functionMetaData The function metadata that needs to be updated
-     * @return a completable future of when the update has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData
functionMetaData) {
-
-        FunctionMetaData existingFunctionMetadata = null;
-        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
-                functionMetaData.getFunctionDetails().getNamespace(),
-                functionMetaData.getFunctionDetails().getName())) {
-            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
-                    functionMetaData.getFunctionDetails().getNamespace(),
-                    functionMetaData.getFunctionDetails().getName());
+    public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean
delete)
+            throws IllegalStateException, IllegalArgumentException {
+        if (exclusiveLeaderProducer == null) {
+            throw new IllegalStateException("Not the leader");
+        }
+        boolean needsScheduling;
+        if (delete) {
+            needsScheduling = proccessDeregister(functionMetaData);
+        } else {
+            needsScheduling = processUpdate(functionMetaData);
+        }
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE
: Request.ServiceRequest.ServiceRequestType.UPDATE)
+                .setFunctionMetaData(functionMetaData)
+                .setWorkerId(workerConfig.getWorkerId())
+                .setRequestId(UUID.randomUUID().toString())
+                .build();
+        try {
+            exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+        } catch (Exception e) {
+            log.error("Could not write into Function Metadata topic", e);
+            errorNotifier.triggerError(e);
+        }
+        if (needsScheduling) {
+            this.schedulerManager.schedule();
         }
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata,
functionMetaData);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(updateRequest);
     }
 
-
-    /**
-     * Sends a deregister request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs
to
-     * @param functionName the name of the function
-     * @return a completable future of when the deregister has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> deregisterFunction(String
tenant, String namespace, String functionName) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData,
functionMetaData);
-
-        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
-
-        return submit(deregisterRequest);
+    // Note that this method cannot be syncrhonized because the tailer might still be processing
messages
+    public void acquireLeadership() {
+        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+        FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+        // Now that we have created the exclusive producer, wait for reader to get over
+        if (tailer != null) {
+            tailer.stopWhenNoMoreMessages();
+            tailer.close();
+        }
+        this.schedulerManager.schedule();
+        log.info("FunctionMetaDataManager done becoming leader by doing its first schedule");
     }
 
-    /**
-     * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
-     * @param tenant the tenant the function that needs to be deregistered belongs to
-     * @param namespace the namespace the function that needs to be deregistered belongs
to
-     * @param functionName the name of the function
-     * @param instanceId the instanceId of the function, -1 if for all instances
-     * @param start do we need to start or stop
-     * @return a completable future of when the start/stop has been applied
-     */
-    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String
tenant, String namespace, String functionName,
-                                                                                      Integer
instanceId, boolean start) {
-        FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
-        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData,
instanceId, start);
-
-        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
-                this.workerConfig.getWorkerId(), newFunctionMetaData);
+    private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+        if (exclusiveLeaderProducer == null) {
+            try {
+                exclusiveLeaderProducer = pulsarClient.newProducer()
+                        .topic(this.workerConfig.getFunctionMetadataTopic())
+                        .producerName(workerConfig.getWorkerId() + "-leader")
+                        // .type(EXCLUSIVE)
+                        .create();
+            } catch (PulsarClientException e) {
+                log.error("Error creating exclusive producer", e);
+                errorNotifier.triggerError(e);
+            }
+        } else {
+            log.error("Logic Error in FunctionMetaData Manager");
+            errorNotifier.triggerError(new IllegalStateException());
+        }
+        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+        this.functionMetaDataTopicTailer = null;
+        return tailer;
+    }
 
-        return submit(updateRequest);
+    public synchronized void giveupLeadership() {
+        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+        try {
+            exclusiveLeaderProducer.close();
+        } catch (PulsarClientException e) {
+            log.error("Error closing exclusive producer", e);
+            errorNotifier.triggerError(e);
+        }
+        exclusiveLeaderProducer = null;
+        initializeTailer();

Review comment:
       It is not a good idea to simply re-create the metadata tailer object.  This will cause
the tailer to start reading from the beginning.  This will cause all the tailer to have to
re-read the whole topic.  The problem here is not only that the the tailer has to re-read
the whole topic which can take time but during that time the in-memory metadata map for the
worker will be inconsistent.  If a user request is sent to the worker during this time, the
behavior might be incorrect.  I would suggest keeping track of the message Id that represents
to which message the current view of the metadata relates to.  When the worker is the leader
and it updates its in memory metadata cache and  produces messages to metadata topic, we should
update the message id.  When the worker loses leadership, the tailer should use the message
id and start reading from there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message