pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peng...@apache.org
Subject [pulsar] 04/05: Reduce the probability of cache inconsistencies (#11423)
Date Fri, 30 Jul 2021 05:17:57 GMT
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9f88b8ac02b8f65db5166ecf532d51912190bc55
Author: feynmanlin <feynmanlin@tencent.com>
AuthorDate: Sat Jul 24 04:48:02 2021 +0800

    Reduce the probability of cache inconsistencies (#11423)
    
    Now when updating the function metadata, the cache is updated first and then send message
to the topic.
    There may be a situation where the local cache updated successfully but the message sending
fails.
    
    Send the message first, then update the local cache
    
    (cherry picked from commit 5819242e2240deb834acc865ee0e9b79821992b1)
---
 .../functions/worker/FunctionMetaDataManager.java  | 33 +++++++++--
 .../worker/FunctionMetaDataManagerTest.java        | 67 ++++++++++++++++++++--
 2 files changed, 90 insertions(+), 10 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index b5ed5eb..1ff1b2b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
@@ -212,12 +211,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
         if (exclusiveLeaderProducer == null) {
             throw new IllegalStateException("Not the leader");
         }
+        // Check first to avoid local cache update failure
+        checkRequestOutDated(functionMetaData, delete);
 
-        if (delete) {
-            needsScheduling = proccessDeregister(functionMetaData);
-        } else {
-            needsScheduling = processUpdate(functionMetaData);
-        }
         byte[] toWrite;
         if (workerConfig.getUseCompactedMetadataTopic()) {
             if (delete) {
@@ -243,6 +239,11 @@ public class FunctionMetaDataManager implements AutoCloseable {
                 builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
             }
             lastMessageSeen = builder.send();
+            if (delete) {
+                needsScheduling = proccessDeregister(functionMetaData);
+            } else {
+                needsScheduling = processUpdate(functionMetaData);
+            }
         } catch (Exception e) {
             log.error("Could not write into Function Metadata topic", e);
             throw new IllegalStateException("Internal Error updating function at the leader",
e);
@@ -253,6 +254,22 @@ public class FunctionMetaDataManager implements AutoCloseable {
         }
     }
 
+    private void checkRequestOutDated(FunctionMetaData functionMetaData, boolean delete)
{
+        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
+        if (isRequestOutdated(details.getTenant(), details.getNamespace(),
+                details.getName(), functionMetaData.getVersion())) {
+            if (log.isDebugEnabled()) {
+                log.debug("{}/{}/{} Ignoring outdated request version: {}", details.getTenant(),
details.getNamespace(),
+                        details.getName(), functionMetaData.getVersion());
+            }
+            if (delete) {
+                throw new IllegalArgumentException(
+                        "Delete request ignored because it is out of date. Please try again.");
+            }
+            throw new IllegalArgumentException("Update request ignored because it is out
of date. Please try again.");
+        }
+    }
+
     /**
      * Acquires a exclusive producer.  This method cannot return null.  It can only return
a valid exclusive producer
      * or throw NotLeaderAnymore exception.
@@ -455,6 +472,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
     }
 
     private boolean isRequestOutdated(String tenant, String namespace, String functionName,
long version) {
+        // avoid NPE
+        if(!containsFunctionMetaData(tenant, namespace, functionName)){
+            return false;
+        }
         FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
                 .get(namespace).get(functionName);
         return currentFunctionMetaData.getVersion() >= version;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 55f4222..3fef9f3 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -20,16 +20,31 @@ package org.apache.pulsar.functions.worker;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Request;
 import org.testng.Assert;
@@ -108,6 +123,50 @@ public class FunctionMetaDataManagerTest {
     }
 
     @Test
+    public void testSendMsgFailWithCompaction() throws Exception {
+        testSendMsgFail(true);
+    }
+
+    @Test
+    public void testSendMsgFailWithoutCompaction() throws Exception {
+        testSendMsgFail(false);
+    }
+
+    private void testSendMsgFail(boolean compact) throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setUseCompactedMetadataTopic(compact);
+        FunctionMetaDataManager functionMetaDataManager = spy(
+                new FunctionMetaDataManager(workerConfig,
+                        mock(SchedulerManager.class),
+                        mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
+        Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+                .setVersion(1)
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
+
+        // become leader
+        Producer<byte[]> exclusiveProducer = spy(functionMetaDataManager.acquireExclusiveWrite(()
-> true));
+        // make sure send msg fail
+        functionMetaDataManager.acquireLeadership(exclusiveProducer);
+        exclusiveProducer.close();
+        when(exclusiveProducer.newMessage()).thenThrow(new RuntimeException("should failed"));
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(m1, false);
+            fail("should failed");
+        } catch (Exception e) {
+            assertTrue(e.getCause().getMessage().contains("should failed"));
+        }
+        assertEquals(functionMetaDataManager.getAllFunctionMetaData().size(), 0);
+        try {
+            functionMetaDataManager.updateFunctionOnLeader(m1, true);
+            fail("should failed");
+        } catch (Exception e) {
+            assertTrue(e.getCause().getMessage().contains("should failed"));
+        }
+        assertEquals(functionMetaDataManager.getAllFunctionMetaData().size(), 0);
+    }
+
+    @Test
     public void testUpdateIfLeaderFunctionWithoutCompaction() throws Exception {
         testUpdateIfLeaderFunction(false);
     }

Mime
View raw message